From 6529a7c57f38f94df10fec339dc131fd4a21a67f Mon Sep 17 00:00:00 2001 From: willzhang4a58 Date: Wed, 23 May 2018 18:57:28 +0800 Subject: [PATCH] two buf in one stream Former-commit-id: dcf1b183c3b86ac547f9ac4581e2d05de0feb853 --- examples/mnist/predict.net | 2 +- oneflow/core/job/improver.cpp | 2 +- ...ersistent_in_stream_without_local_copy.cpp | 6 +- ..._persistent_in_stream_without_local_copy.h | 2 +- .../persistence/normal_persistent_in_stream.h | 2 + ...ersistent_in_stream_without_local_copy.cpp | 70 ++++++++++++++----- .../persistent_in_stream_without_local_copy.h | 15 +++- oneflow/core/thread/thread_pool.cpp | 34 +++++++++ oneflow/core/thread/thread_pool.h | 28 ++++++++ 9 files changed, 137 insertions(+), 24 deletions(-) create mode 100644 oneflow/core/thread/thread_pool.cpp create mode 100644 oneflow/core/thread/thread_pool.h diff --git a/examples/mnist/predict.net b/examples/mnist/predict.net index 661eda0498..c10d8436ea 100644 --- a/examples/mnist/predict.net +++ b/examples/mnist/predict.net @@ -1,7 +1,7 @@ op { name: "decode" decode_ofrecord_conf { - data_dir: "/dataset/mnist_kaggle/test/6" + data_dir: "/dataset/mnist_kaggle/train/6" blob { name: "img_raw" data_type: kFloat diff --git a/oneflow/core/job/improver.cpp b/oneflow/core/job/improver.cpp index 15e17a2042..9fcc6a81ec 100644 --- a/oneflow/core/job/improver.cpp +++ b/oneflow/core/job/improver.cpp @@ -177,7 +177,7 @@ uint64_t Improver::AvailableMemSize(int64_t machine_id, int64_t memory_zone_id) JobDesc* job_desc = Global::Get(); if (memory_zone_id == job_desc->GpuDeviceNum()) { mem_size -= job_desc->reserved_host_mem_byte(); - mem_size -= job_desc->persistence_buf_byte() * record_load_task_num_.at(machine_id); + mem_size -= job_desc->persistence_buf_byte() * 2 * record_load_task_num_.at(machine_id); } else { mem_size -= job_desc->reserved_device_mem_byte(); } diff --git a/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.cpp b/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.cpp index 2b67d5bf3b..3068fbd2ef 100644 --- a/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.cpp +++ b/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.cpp @@ -1,4 +1,5 @@ #include "oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.h" +#include "oneflow/core/job/job_desc.h" namespace oneflow { @@ -10,8 +11,9 @@ CyclicPersistentInStreamWithoutLocalCopy::CyclicPersistentInStreamWithoutLocalCo } void CyclicPersistentInStreamWithoutLocalCopy::UpdateBuffer() { - if (is_first_update_buffer_ == false && file_size() <= mut_buffer()->size() - 1) { - set_cur_buf_begin(mut_buffer()->data()); + if (is_first_update_buffer_ == false + && file_size() <= Global::Get()->persistence_buf_byte()) { + set_cur_buf_begin(mut_buffer()); } else { PersistentInStreamWithoutLocalCopy::UpdateBuffer(); } diff --git a/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.h b/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.h index 27fcafdbed..bcd9000934 100644 --- a/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.h +++ b/oneflow/core/persistence/cyclic_persistent_in_stream_without_local_copy.h @@ -9,7 +9,7 @@ class CyclicPersistentInStreamWithoutLocalCopy final : public PersistentInStream public: OF_DISALLOW_COPY_AND_MOVE(CyclicPersistentInStreamWithoutLocalCopy); CyclicPersistentInStreamWithoutLocalCopy() = delete; - ~CyclicPersistentInStreamWithoutLocalCopy() = default; + ~CyclicPersistentInStreamWithoutLocalCopy() { WaitUntilStandByBufferReadyBytesNotEqualZero(); } CyclicPersistentInStreamWithoutLocalCopy(fs::FileSystem* fs, const std::string& file_path); diff --git a/oneflow/core/persistence/normal_persistent_in_stream.h b/oneflow/core/persistence/normal_persistent_in_stream.h index bd23dd0921..9535616b32 100644 --- a/oneflow/core/persistence/normal_persistent_in_stream.h +++ b/oneflow/core/persistence/normal_persistent_in_stream.h @@ -18,6 +18,8 @@ class NormalPersistentInStream final : public PersistentInStreamWithoutLocalCopy NormalPersistentInStream(fs::FileSystem* fs, const std::string& file_path) : NormalPersistentInStream(fs, file_path, 0) {} + ~NormalPersistentInStream() { WaitUntilStandByBufferReadyBytesNotEqualZero(); } + private: void AddNForCurFilePos(uint64_t n) override { set_cur_file_pos(cur_file_pos() + n); } }; diff --git a/oneflow/core/persistence/persistent_in_stream_without_local_copy.cpp b/oneflow/core/persistence/persistent_in_stream_without_local_copy.cpp index 6449951c26..c57773d5e1 100644 --- a/oneflow/core/persistence/persistent_in_stream_without_local_copy.cpp +++ b/oneflow/core/persistence/persistent_in_stream_without_local_copy.cpp @@ -1,9 +1,17 @@ #include "oneflow/core/persistence/persistent_in_stream_without_local_copy.h" #include "oneflow/core/job/job_desc.h" -#include +#include "oneflow/core/thread/thread_pool.h" namespace oneflow { +static ThreadPool g_persistent_in_thread_pool(1); + +PersistentInStreamWithoutLocalCopy::~PersistentInStreamWithoutLocalCopy() { + WaitUntilStandByBufferReadyBytesNotEqualZero(); + delete[] standby_buffer_; + delete[] buffer_; +} + int32_t PersistentInStreamWithoutLocalCopy::ReadLine(std::string* l) { if (IsEof()) { return -1; } l->clear(); @@ -27,8 +35,8 @@ int32_t PersistentInStreamWithoutLocalCopy::Read(char* s, size_t n) { while (n) { if (cur_buf_begin_ == cur_buf_end_) { UpdateBuffer(); } CHECK_LT(cur_buf_begin_, cur_buf_end_); - int64_t copy_size = std::min(cur_buf_end_ - cur_buf_begin_, static_cast(n)); - std::memcpy(s, cur_buf_begin_, static_cast(copy_size)); + size_t copy_size = std::min(cur_buf_end_ - cur_buf_begin_, n); + memcpy(s, cur_buf_begin_, copy_size); s += copy_size; cur_buf_begin_ += copy_size; n -= copy_size; @@ -41,26 +49,56 @@ PersistentInStreamWithoutLocalCopy::PersistentInStreamWithoutLocalCopy(fs::FileS uint64_t offset) { fs->NewRandomAccessFile(file_path, &file_); file_size_ = fs->GetFileSize(file_path); + CHECK_LT(offset, file_size_); + standby_buffer_ = new char[Global::Get()->persistence_buf_byte() + 1]; + standby_buffer_ready_bytes_ = 0; cur_file_pos_ = offset; - buffer_.resize(Global::Get()->persistence_buf_byte() + 1); - cur_buf_begin_ = buffer_.data(); - cur_buf_end_ = buffer_.data(); + file_read_done_ = false; + buffer_ = new char[Global::Get()->persistence_buf_byte() + 1]; + cur_buf_begin_ = buffer_; + cur_buf_end_ = buffer_; *cur_buf_end_ = '\0'; -} - -bool PersistentInStreamWithoutLocalCopy::IsEof() const { - return cur_buf_begin_ == cur_buf_end_ && cur_file_pos_ == file_size_; + AsyncUpdateStandByBuffer(); } void PersistentInStreamWithoutLocalCopy::UpdateBuffer() { CHECK_EQ(cur_buf_begin_, cur_buf_end_); - uint64_t n = std::min(buffer_.size() - 1, file_size_ - cur_file_pos_); - if (n == 0) { return; } - file_->Read(cur_file_pos_, n, buffer_.data()); - cur_buf_begin_ = buffer_.data(); - cur_buf_end_ = buffer_.data() + n; + WaitUntilStandByBufferReadyBytesNotEqualZero(); + if (standby_buffer_ready_bytes_ == -1) { return; } + std::swap(standby_buffer_, buffer_); + cur_buf_begin_ = buffer_; + cur_buf_end_ = buffer_ + standby_buffer_ready_bytes_; *cur_buf_end_ = '\0'; - AddNForCurFilePos(n); + standby_buffer_ready_bytes_ = 0; + AsyncUpdateStandByBuffer(); +} + +void PersistentInStreamWithoutLocalCopy::WaitUntilStandByBufferReadyBytesNotEqualZero() { + std::unique_lock lck(standby_buffer_ready_mtx_); + standby_buffer_ready_cond_.wait(lck, [this]() { return standby_buffer_ready_bytes_ != 0; }); +} + +void PersistentInStreamWithoutLocalCopy::AsyncUpdateStandByBuffer() { + g_persistent_in_thread_pool.AddWork([this]() { + uint64_t n = + std::min(Global::Get()->persistence_buf_byte(), file_size_ - cur_file_pos_); + if (n > 0) { + file_->Read(cur_file_pos_, n, standby_buffer_); + AddNForCurFilePos(n); + } + if (cur_file_pos_ == file_size_) { file_read_done_ = true; } + std::unique_lock lck(standby_buffer_ready_mtx_); + if (n > 0) { + standby_buffer_ready_bytes_ = n; + } else { + standby_buffer_ready_bytes_ = -1; + } + standby_buffer_ready_cond_.notify_all(); + }); +} + +bool PersistentInStreamWithoutLocalCopy::IsEof() const { + return cur_buf_begin_ == cur_buf_end_ && file_read_done_; } } // namespace oneflow diff --git a/oneflow/core/persistence/persistent_in_stream_without_local_copy.h b/oneflow/core/persistence/persistent_in_stream_without_local_copy.h index 2a1d7f65cd..507c8afe88 100644 --- a/oneflow/core/persistence/persistent_in_stream_without_local_copy.h +++ b/oneflow/core/persistence/persistent_in_stream_without_local_copy.h @@ -10,7 +10,7 @@ class PersistentInStreamWithoutLocalCopy : public PersistentInStream { public: OF_DISALLOW_COPY_AND_MOVE(PersistentInStreamWithoutLocalCopy); PersistentInStreamWithoutLocalCopy() = delete; - virtual ~PersistentInStreamWithoutLocalCopy() = default; + virtual ~PersistentInStreamWithoutLocalCopy(); int32_t ReadLine(std::string* l) override; int32_t Read(char* s, size_t n) override; @@ -21,18 +21,27 @@ class PersistentInStreamWithoutLocalCopy : public PersistentInStream { virtual void UpdateBuffer(); virtual void AddNForCurFilePos(uint64_t n) = 0; uint64_t file_size() const { return file_size_; } - std::vector* mut_buffer() { return &buffer_; } + char* mut_buffer() { return buffer_; } uint64_t cur_file_pos() const { return cur_file_pos_; } void set_cur_file_pos(uint64_t val) { cur_file_pos_ = val; } void set_cur_buf_begin(char* val) { cur_buf_begin_ = val; } + void WaitUntilStandByBufferReadyBytesNotEqualZero(); private: + void AsyncUpdateStandByBuffer(); bool IsEof() const; std::unique_ptr file_; uint64_t file_size_; + + char* standby_buffer_; + int64_t standby_buffer_ready_bytes_; + std::mutex standby_buffer_ready_mtx_; + std::condition_variable standby_buffer_ready_cond_; uint64_t cur_file_pos_; - std::vector buffer_; + std::atomic file_read_done_; + + char* buffer_; char* cur_buf_begin_; char* cur_buf_end_; }; diff --git a/oneflow/core/thread/thread_pool.cpp b/oneflow/core/thread/thread_pool.cpp new file mode 100644 index 0000000000..5a124f830e --- /dev/null +++ b/oneflow/core/thread/thread_pool.cpp @@ -0,0 +1,34 @@ +#include "oneflow/core/thread/thread_pool.h" + +namespace oneflow { + +ThreadPool::ThreadPool(int32_t thread_num) + : work_chans_(thread_num), threads_(thread_num), cur_chan_idx_(0) { + FOR_RANGE(int32_t, i, 0, thread_num) { + Channel>* chan = &(work_chans_.at(i)); + threads_[i] = std::thread([chan]() { + std::function work; + while (chan->Receive(&work) == 0) { work(); } + }); + } +} + +ThreadPool::~ThreadPool() { + FOR_RANGE(int32_t, i, 0, work_chans_.size()) { + work_chans_.at(i).CloseSendEnd(); + work_chans_.at(i).CloseReceiveEnd(); + threads_.at(i).join(); + } +} + +void ThreadPool::AddWork(std::function work) { + if (work_chans_.size() > 1) { + std::unique_lock lck(cur_chan_idx_mtx_); + work_chans_.at(cur_chan_idx_).Send(work); + cur_chan_idx_ = (cur_chan_idx_ + 1) % work_chans_.size(); + } else { + work_chans_.at(cur_chan_idx_).Send(work); + } +} + +} // namespace oneflow diff --git a/oneflow/core/thread/thread_pool.h b/oneflow/core/thread/thread_pool.h new file mode 100644 index 0000000000..27f8532914 --- /dev/null +++ b/oneflow/core/thread/thread_pool.h @@ -0,0 +1,28 @@ +#ifndef ONEFLOW_CORE_THREAD_THREAD_POOL_H_ +#define ONEFLOW_CORE_THREAD_THREAD_POOL_H_ + +#include "oneflow/core/common/util.h" +#include "oneflow/core/common/channel.h" + +namespace oneflow { + +class ThreadPool final { + public: + OF_DISALLOW_COPY_AND_MOVE(ThreadPool); + ThreadPool() = delete; + ThreadPool(int32_t thread_num); + ~ThreadPool(); + + void AddWork(std::function work); + + private: + std::vector>> work_chans_; + std::vector threads_; + + std::mutex cur_chan_idx_mtx_; + int32_t cur_chan_idx_; +}; + +} // namespace oneflow + +#endif // ONEFLOW_CORE_THREAD_THREAD_POOL_H_ -- GitLab