From 3f8031e256f94cac170497405e10815f119dbd40 Mon Sep 17 00:00:00 2001 From: jiaqi <173596896@qq.com> Date: Fri, 21 Jun 2019 14:24:54 +0800 Subject: [PATCH] dataset (#17973) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (1) use channel instead of vector/BlockingQueue in Dataset,to keep same with existing implementation, and make code more readable and flexible (dataset single output channel or multi output channel). one previous memory out of limit problem is cause by not release memory after training. (2) add Record because MultiSlotType costs too much memory (80B),fix memory out of limit problem. (3) add Channel, Archive in paddle/fluid/framework (4) change dataset from shared_ptr to unique_ptr in pybind (5) move create/destroy readers from trainer to dataset (6) move shuffle from datafeed to dataset. dataset holds memory, datafeed is only for load data and feed data to network. (7) fix thread num bug of Dataset when filelist size < thread num (8) support set_queue_num in InMemoryDataset --- paddle/fluid/framework/archive.h | 609 +++++++++++++++++++ paddle/fluid/framework/channel.h | 460 ++++++++++++++ paddle/fluid/framework/data_feed.cc | 418 ++++--------- paddle/fluid/framework/data_feed.h | 211 +++++-- paddle/fluid/framework/data_set.cc | 306 +++++++--- paddle/fluid/framework/data_set.h | 37 +- paddle/fluid/framework/dataset_factory.cc | 8 +- paddle/fluid/framework/dataset_factory.h | 2 +- paddle/fluid/framework/device_worker.cc | 2 +- paddle/fluid/framework/device_worker.h | 4 +- paddle/fluid/framework/dist_multi_trainer.cc | 4 +- paddle/fluid/framework/expect.h | 32 + paddle/fluid/framework/multi_trainer.cc | 5 +- paddle/fluid/framework/pipeline_trainer.cc | 5 +- paddle/fluid/framework/trainer.h | 4 +- paddle/fluid/pybind/data_set_py.cc | 71 ++- python/paddle/fluid/dataset.py | 96 ++- python/paddle/fluid/executor.py | 6 +- 18 files changed, 1784 insertions(+), 496 deletions(-) create mode 100644 paddle/fluid/framework/archive.h create mode 100644 paddle/fluid/framework/channel.h create mode 100644 paddle/fluid/framework/expect.h diff --git a/paddle/fluid/framework/archive.h b/paddle/fluid/framework/archive.h new file mode 100644 index 00000000000..100eb9518f7 --- /dev/null +++ b/paddle/fluid/framework/archive.h @@ -0,0 +1,609 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "paddle/fluid/framework/expect.h" + +namespace paddle { +namespace framework { + +// not a virtual class +class ArchiveBase { + protected: + ArchiveBase() {} + + // Archive is not copyable. But to allow move capture by function objects, + // check it at runtime rather than at compile time. + ArchiveBase(const ArchiveBase&) { LOG(FATAL) << "Not supported"; } + + ArchiveBase(ArchiveBase&& other) + : buffer_(other.buffer_), + cursor_(other.cursor_), + finish_(other.finish_), + limit_(other.limit_), + deleter_(std::move(other.deleter_)) { + other.buffer_ = NULL; + other.cursor_ = NULL; + other.finish_ = NULL; + other.limit_ = NULL; + other.deleter_ = nullptr; + } + + ~ArchiveBase() { FreeBuffer(); } + + public: + ArchiveBase& operator=(const ArchiveBase&) { + LOG(FATAL) << "Not supported"; + return *this; + } + + ArchiveBase& operator=(ArchiveBase&& other) { + if (this != &other) { + FreeBuffer(); + buffer_ = other.buffer_; + cursor_ = other.cursor_; + finish_ = other.finish_; + limit_ = other.limit_; + deleter_ = std::move(other.deleter_); + other.buffer_ = NULL; + other.cursor_ = NULL; + other.finish_ = NULL; + other.limit_ = NULL; + other.deleter_ = nullptr; + } + return *this; + } + + char* Buffer() { return buffer_; } + + void SetReadBuffer(char* buffer, size_t length, + std::function&& deleter) { + SetBuffer(buffer, length, length, std::move(deleter)); + } + + void SetWriteBuffer(char* buffer, size_t capacity, + std::function&& deleter) { + SetBuffer(buffer, 0, capacity, std::move(deleter)); + } + + void SetBuffer(char* buffer, size_t length, size_t capacity, + std::function&& deleter) { + CHECK(length <= capacity); + FreeBuffer(); + buffer_ = buffer; + cursor_ = buffer_; + finish_ = buffer + length; + limit_ = buffer + capacity; + deleter_ = std::move(deleter); + } + + char* Cursor() { return cursor_; } + + void SetCursor(char* cursor) { + CHECK(cursor >= buffer_ && cursor <= finish_); + cursor_ = cursor; + } + + void AdvanceCursor(size_t offset) { + CHECK(offset <= size_t(finish_ - cursor_)); + cursor_ += offset; + } + + char* Finish() { return finish_; } + + void SetFinish(char* finish) { + CHECK(finish >= cursor_ && finish <= limit_); + finish_ = finish; + } + + void AdvanceFinish(size_t offset) { + CHECK(offset <= size_t(limit_ - finish_)); + finish_ += offset; + } + + char* Limit() { return limit_; } + + size_t Position() { return cursor_ - buffer_; } + + size_t Length() { return finish_ - buffer_; } + + size_t Capacity() { return limit_ - buffer_; } + + bool Empty() { return finish_ == buffer_; } + + void Reset() { + FreeBuffer(); + buffer_ = NULL; + cursor_ = NULL; + finish_ = NULL; + limit_ = NULL; + } + + void Clear() { + cursor_ = buffer_; + finish_ = buffer_; + } + + char* Release() { + char* buf = buffer_; + buffer_ = NULL; + cursor_ = NULL; + finish_ = NULL; + deleter_ = nullptr; + return buf; + } + + void Resize(size_t newsize) { +#ifdef _LINUX + if (unlikely(newsize > Capacity())) { +#else + if (newsize > Capacity()) { +#endif + Reserve(std::max(Capacity() * 2, newsize)); + } + finish_ = buffer_ + newsize; + cursor_ = std::min(cursor_, finish_); + } + + void Reserve(size_t newcap) { + if (newcap > Capacity()) { + char* newbuf = NULL; + newbuf = new char[newcap]; + CHECK(newbuf != nullptr) << "Reserve failed, out of memory"; + if (Length() > 0) { + memcpy(newbuf, buffer_, Length()); + } + cursor_ = newbuf + (cursor_ - buffer_); + finish_ = newbuf + (finish_ - buffer_); + limit_ = newbuf + newcap; + FreeBuffer(); + buffer_ = newbuf; + deleter_ = std::default_delete(); + } + } + + void PrepareRead(size_t size) { +#ifdef _LINUX + if (unlikely(!(size <= size_t(finish_ - cursor_)))) { +#else + if (!(size <= size_t(finish_ - cursor_))) { +#endif + CHECK(size <= size_t(finish_ - cursor_)); + } + } + + void PrepareWrite(size_t size) { +#ifdef _LINUX + if (unlikely(size > size_t(limit_ - finish_))) { +#else + if (size > size_t(limit_ - finish_)) { +#endif + Reserve(std::max(Capacity() * 2, Length() + size)); + } + } + + void Read(void* data, size_t size) { + if (size > 0) { + PrepareRead(size); + memcpy(data, cursor_, size); + AdvanceCursor(size); + } + } + + void ReadBack(void* data, size_t size) { + if (size > 0) { + CHECK(size <= size_t(finish_ - cursor_)); + memcpy(data, finish_ - size, size); + finish_ -= size; + } + } + + void Write(const void* data, size_t size) { + if (size > 0) { + PrepareWrite(size); + memcpy(finish_, data, size); + AdvanceFinish(size); + } + } + + template + void GetRaw(T& x) { // NOLINT + PrepareRead(sizeof(T)); + memcpy(&x, cursor_, sizeof(T)); + AdvanceCursor(sizeof(T)); + } + + template + T GetRaw() { + T x; + GetRaw(x); + return x; + } + + template + void PutRaw(const T& x) { + PrepareWrite(sizeof(T)); + memcpy(finish_, &x, sizeof(T)); + AdvanceFinish(sizeof(T)); + } + + protected: + char* buffer_ = NULL; + char* cursor_ = NULL; + char* finish_ = NULL; + char* limit_ = NULL; + std::function deleter_ = nullptr; + + void FreeBuffer() { + if (deleter_) { + deleter_(buffer_); + } + deleter_ = nullptr; + } +}; // NOLINT + +template +class Archive {}; + +class BinaryArchiveType {}; + +typedef Archive BinaryArchive; + +template <> +class Archive : public ArchiveBase { + public: +#define ARCHIVE_REPEAT(T) \ + BinaryArchive& operator>>(T& x) { \ + GetRaw(x); \ + return *this; \ + } \ + BinaryArchive& operator<<(const T& x) { \ + PutRaw(x); \ + return *this; \ + } + + ARCHIVE_REPEAT(int16_t) + ARCHIVE_REPEAT(uint16_t) + ARCHIVE_REPEAT(int32_t) + ARCHIVE_REPEAT(uint32_t) + ARCHIVE_REPEAT(int64_t) + ARCHIVE_REPEAT(uint64_t) + ARCHIVE_REPEAT(float) + ARCHIVE_REPEAT(double) + ARCHIVE_REPEAT(signed char) + ARCHIVE_REPEAT(unsigned char) + ARCHIVE_REPEAT(bool) + +#undef ARCHIVE_REPEAT + + template + T Get() { + T x; + *this >> x; + return x; + } +}; + +template +Archive& operator<<(Archive& ar, const T (&p)[N]) { + for (size_t i = 0; i < N; i++) { + ar << p[i]; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, T (&p)[N]) { + for (size_t i = 0; i < N; i++) { + ar >> p[i]; + } + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::vector& p) { +#ifdef _LINUX + ar << (size_t)p.size(); +#else + ar << (uint64_t)p.size(); +#endif + for (const auto& x : p) { + ar << x; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, std::vector& p) { +#ifdef _LINUX + p.resize(ar.template Get()); +#else + p.resize(ar.template Get()); +#endif + for (auto& x : p) { + ar >> x; + } + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::valarray& p) { +#ifdef _LINUX + ar << (size_t)p.size(); +#else + ar << (uint64_t)p.size(); +#endif + for (const auto& x : p) { + ar << x; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, std::valarray& p) { +#ifdef _LINUX + p.resize(ar.template Get()); +#else + p.resize(ar.template Get()); +#endif + for (auto& x : p) { + ar >> x; + } + return ar; +} + +inline BinaryArchive& operator<<(BinaryArchive& ar, const std::string& s) { +#ifdef _LINUX + ar << (size_t)s.length(); +#else + ar << (uint64_t)s.length(); +#endif + ar.Write(&s[0], s.length()); + return ar; +} + +inline BinaryArchive& operator>>(BinaryArchive& ar, std::string& s) { +#ifdef _LINUX + size_t len = ar.template Get(); +#else + size_t len = ar.template Get(); +#endif + ar.PrepareRead(len); + s.assign(ar.Cursor(), len); + ar.AdvanceCursor(len); + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::pair& x) { + return ar << x.first << x.second; +} + +template +Archive& operator>>(Archive& ar, std::pair& x) { // NOLINT + return ar >> x.first >> x.second; +} + +#ifdef _LINUX +template +Archive& SerializeTuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return ar; +} +#else +template +Archive& SerializeTuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return ar; +} +#endif + +#ifdef _LINUX +template +Archive& serialize_tuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return SerializeTuple(ar, x, std::integral_constant()) + << std::get(x); +} +#else +template +Archive& serialize_tuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return SerializeTuple(ar, x, std::integral_constant()) + << std::get(x); +} +#endif + +#ifdef _LINUX +template +Archive& operator<<(Archive& ar, const std::tuple& x) { + const size_t size = std::tuple_size>::value; + return SerializeTuple(ar, x, std::integral_constant()); +} +#else +template +Archive& operator<<(Archive& ar, const std::tuple& x) { + const uint64_t size = std::tuple_size>::value; + return SerializeTuple(ar, x, std::integral_constant()); +} +#endif + +#ifdef _LINUX +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return ar; +} +#else +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return ar; +} +#endif + +#ifdef _LINUX +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return DeserializeTuple(ar, x, std::integral_constant()) >> + std::get(x); +} +#else +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return DeserializeTuple(ar, x, std::integral_constant()) >> + std::get(x); +} +#endif + +#ifdef _LINUX +template +Archive& operator>>(Archive& ar, std::tuple& x) { + const size_t size = std::tuple_size>::value; + return DeserializeTuple(ar, x, std::integral_constant()); +} +#else +template +Archive& operator>>(Archive& ar, std::tuple& x) { + const uint64_t size = std::tuple_size>::value; + return DeserializeTuple(ar, x, std::integral_constant()); +} +#endif + +#ifdef _LINUX +#define ARCHIVE_REPEAT(MAP_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, \ + const MAP_TYPE& p) { \ + ar << (size_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, MAP_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get>()); \ + } \ + return ar; \ + } +#else +#define ARCHIVE_REPEAT(MAP_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, \ + const MAP_TYPE& p) { \ + ar << (uint64_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, MAP_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get>()); \ + } \ + return ar; \ + } +#endif + +ARCHIVE_REPEAT(std::map, ) +ARCHIVE_REPEAT(std::multimap, ) +ARCHIVE_REPEAT(std::unordered_map, p.reserve(size)) +ARCHIVE_REPEAT(std::unordered_multimap, p.reserve(size)) + +#undef ARCHIVE_REPEAT + +#ifdef _LINUX +#define ARCHIVE_REPEAT(SET_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, const SET_TYPE& p) { \ + ar << (size_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, SET_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get()); \ + } \ + return ar; \ + } +#else +#define ARCHIVE_REPEAT(SET_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, const SET_TYPE& p) { \ + ar << (uint64_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, SET_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get()); \ + } \ + return ar; \ + } +#endif + +ARCHIVE_REPEAT(std::set, ) +ARCHIVE_REPEAT(std::multiset, ) +ARCHIVE_REPEAT(std::unordered_set, p.reserve(size)) +ARCHIVE_REPEAT(std::unordered_multiset, p.reserve(size)) + +#undef ARCHIVE_REPEAT + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h new file mode 100644 index 00000000000..644f60dbebf --- /dev/null +++ b/paddle/fluid/framework/channel.h @@ -0,0 +1,460 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#include +#include +#include // NOLINT +#include +#include +#include +#include // NOLINT +#include +#include +#include "paddle/fluid/framework/expect.h" + +namespace paddle { +namespace framework { + +template +class ChannelObject { + public: + ChannelObject() {} + + // capacity can be zero + explicit ChannelObject(size_t capacity) { + capacity_ = std::min(MaxCapacity(), capacity); + } + + void Clear() { + std::unique_lock lock(mutex_); + data_.clear(); + data_.shrink_to_fit(); + } + + size_t Capacity() { + return capacity_; // atomic + } + + void SetCapacity(size_t x) { // capacity can be zero + std::lock_guard lock(mutex_); + capacity_ = std::min(MaxCapacity(), x); + Notify(); + } + + size_t BlockSize() { + return block_size_; // atomic + } + + void SetBlockSize(size_t x) { + CHECK(x >= 1) << "block size must be >= 1"; + std::lock_guard lock(mutex_); + block_size_ = x; + } + + template + void InheritFrom(const std::shared_ptr>& other) { + std::lock_guard lock(mutex_); + capacity_ = other->Capacity(); + block_size_ = other->BlockSize(); + } + + bool Closed() { + return closed_; // atomic + } + + // open channel, then data can be write() to channel + void Open() { + std::lock_guard lock(mutex_); + closed_ = false; + Notify(); + } + + // close channel, then no more data can be write() to channel + void Close() { + std::lock_guard lock(mutex_); + closed_ = true; + Notify(); + } + + size_t Size() { + std::lock_guard lock(mutex_); + return data_.size(); + } + + bool Empty() { + std::lock_guard lock(mutex_); + return EmptyUnlocked(); + } + + // blocking operation + bool Get(T& val) { return Read(1, &val) != 0; } // NOLINT + + // blocking operation + // returns 0 if the channel is closed and empty + size_t Read(size_t n, T* p) { + if (n == 0) { + return 0; + } + + std::unique_lock lock(mutex_); + size_t finished = Read(n, p, lock); + Notify(); + return finished; + } + + // blocking operation + bool Put(T&& val) { return WriteMove(1, &val) != 0; } + + // blocking operation + bool Put(const T& val) { return Write(1, &val) != 0; } + + // blocking operation + // returns value less than n if the channel is closed + size_t Write(size_t n, const T* p) { + if (n == 0) { + return 0; + } + std::unique_lock lock(mutex_); + size_t finished = Write(n, p, lock); + Notify(); + return finished; + } + + // WriteMove() will clear original contents of input array + size_t WriteMove(size_t n, T* p) { + if (n == 0) { + return 0; + } + std::unique_lock lock(mutex_); + size_t finished = WriteMove(n, p, lock); + Notify(); + return finished; + } + + // read data of block size from channel to vector + size_t Read(std::vector& p) { // NOLINT + p.resize(block_size_); + size_t finished = Read(p.size(), &p[0]); + p.resize(finished); + return finished; + } + + size_t ReadAll(std::vector& p) { // NOLINT + p.clear(); + size_t finished = 0; + size_t n = 0; + do { + // _block_size may change anytime + n = block_size_; + p.resize(finished + n); + n = Read(n, &p[finished]); + finished += n; + } while (n != 0); + p.resize(finished); + return finished; + } + + // write data from vector to channel + size_t Write(const std::vector& p) { return Write(p.size(), &p[0]); } + + // write data from vector to channel + size_t Write(std::vector&& p) { return WriteMove(p.size(), &p[0]); } + + private: + size_t capacity_ = MaxCapacity(); + size_t block_size_ = 1024; + bool closed_ = false; + std::mutex mutex_; + // use deque to store data + std::deque data_; + size_t reading_count_ = 0; + int empty_waiters_ = 0; + int full_waiters_ = 0; + std::condition_variable empty_cond_; + std::condition_variable full_cond_; + + static constexpr size_t MaxCapacity() { + return std::numeric_limits::max() / 2; + } + + void Notify() { + if (empty_waiters_ != 0 && (!EmptyUnlocked() || closed_)) { + empty_cond_.notify_one(); + } + if (full_waiters_ != 0 && (!FullUnlocked() || closed_)) { + full_cond_.notify_one(); + } + } + + bool EmptyUnlocked() { return data_.empty(); } + + bool FullUnlocked() { return data_.size() >= capacity_ + reading_count_; } + + bool WaitForRead(std::unique_lock& lock) { // NOLINT +#ifdef _LINUX + while (unlikely(EmptyUnlocked() && !closed_)) { +#else + while (EmptyUnlocked() && !closed_) { +#endif + if (full_waiters_ != 0) { + full_cond_.notify_one(); + } + empty_waiters_++; + empty_cond_.wait(lock); + empty_waiters_--; + } + return !EmptyUnlocked(); + } + + bool WaitForWrite(std::unique_lock& lock) { // NOLINT +#ifdef _LINUX + while (unlikely(FullUnlocked() && !closed_)) { +#else + while (FullUnlocked() && !closed_) { +#endif + if (empty_waiters_ != 0) { + empty_cond_.notify_one(); + } + full_waiters_++; + full_cond_.wait(lock); + full_waiters_--; + } + return !closed_; + } + + size_t Read(size_t n, T* p, std::unique_lock& lock) { // NOLINT + size_t finished = 0; + CHECK(n <= MaxCapacity() - reading_count_); + reading_count_ += n; + while (finished < n && WaitForRead(lock)) { + size_t m = std::min(n - finished, data_.size()); + for (size_t i = 0; i < m; i++) { + p[finished++] = std::move(data_.front()); + data_.pop_front(); + } + reading_count_ -= m; + } + reading_count_ -= n - finished; + return finished; + } + + size_t Write(size_t n, + const T* p, // NOLINT + std::unique_lock& lock) { // NOLINT + size_t finished = 0; + while (finished < n && WaitForWrite(lock)) { + size_t m = + std::min(n - finished, capacity_ + reading_count_ - data_.size()); + for (size_t i = 0; i < m; i++) { + data_.push_back(p[finished++]); + } + } + return finished; + } + + size_t WriteMove(size_t n, + T* p, // NOLINT + std::unique_lock& lock) { // NOLINT + size_t finished = 0; + while (finished < n && WaitForWrite(lock)) { + size_t m = + std::min(n - finished, capacity_ + reading_count_ - data_.size()); + for (size_t i = 0; i < m; i++) { + data_.push_back(std::move(p[finished++])); + } + } + return finished; + } +}; // NOLINT + +template +using Channel = std::shared_ptr>; + +template +Channel MakeChannel(size_t capacity = std::numeric_limits::max()) { + return std::make_shared>(capacity); +} + +template +Channel MakeChannel(const Channel& other) { + CHECK(other != nullptr) << "channel can not be NULL"; + Channel chan = std::make_shared>(); + chan->InheritFrom(other); + return chan; +} + +// NOTE: ChannelReader is a wrapper for quick read channel with a buffer. It +// will read a block data from channel, but user can get data one by one. So it +// is important to notice that user must call operator>> until false, or call +// get_buffer_remain until false to make sure the buffered data all readed. +template +class ChannelReader { + public: + explicit ChannelReader(ChannelObject* channel = nullptr) { + Reset(channel); + } + + ~ChannelReader() { CHECK(cursor_ == 0) << "Forgot to read buffer data"; } + + ChannelObject* channel() { return channel_; } + + void Reset(ChannelObject* channel) { + CHECK(channel != nullptr) << "Channel can not be nullptr"; + channel_ = channel; + cursor_ = 0; + failed_ = !channel; + } + + // whether there were read failed + operator bool() { return !failed_; } + + ChannelReader& operator>>(T& val) { + if (failed_) { + return *this; + } + if (cursor_ >= buffer_.size()) { + cursor_ = 0; + if (channel_->read(buffer_) == 0) { + failed_ = true; + return *this; + } + } + val = std::move(buffer_[cursor_++]); + return *this; + } + + bool GetBufferRemain(T& val) { // NOLINT + if (cursor_ >= buffer_.size()) { + cursor_ = 0; + return false; + } + val = std::move(buffer_[cursor_++]); + return true; + } + + private: + ChannelObject* channel_ = nullptr; + std::vector buffer_; + size_t cursor_ = 0; + bool failed_ = true; +}; // NOLINT + +template +class ChannelWriter { + public: + explicit ChannelWriter(ChannelObject* channel = nullptr) { + Reset(channel); + } + + ~ChannelWriter() { CHECK(buffer_.empty()) << "Forgot to flush"; } + + ChannelObject* channel() { return channel_; } + + void Reset(ChannelObject* channel) { + CHECK(buffer_.empty()) << "Forgot to flush"; + CHECK(channel != nullptr) << "Channel can not be nullptr"; + channel_ = channel; + buffer_.clear(); + failed_ = !channel; + } + + // whether there were write failed + operator bool() { return !failed_; } + + ChannelWriter& operator<<(T&& val) { + if (failed_) { + return *this; + } + buffer_.push_back(std::move(val)); + if (buffer_.size() >= channel_->BlockSize()) { + Flush(); + } + return *this; + } + + ChannelWriter& operator<<(const T& val) { + if (failed_) { + return *this; + } + buffer_.push_back(val); + if (buffer_.size() >= channel_->BlockSize()) { + Flush(); + } + return *this; + } + + void Flush() { + if (failed_ || buffer_.empty()) { + buffer_.clear(); + return; + } + failed_ |= + channel_->WriteMove(buffer_.size(), &buffer_[0]) != buffer_.size(); + buffer_.clear(); + } + + private: + ChannelObject* channel_ = nullptr; + std::vector buffer_; + bool failed_ = true; +}; // NOLINT + +// only used for range-for loop +// for (auto& x : chan) {...} +template +struct ChannelIterator { + std::shared_ptr> reader_; + T data_; + + void operator++() { + CHECK(reader_ != nullptr) << "reader can not be NULL"; + if (!(*reader_ >> data_)) { + reader_ = nullptr; + } + } + + T& operator*() { return data_; } + + friend bool operator==(const ChannelIterator& a, + const ChannelIterator& b) { + return a.reader_ == b.reader_; + } + + friend bool operator!=(const ChannelIterator& a, + const ChannelIterator& b) { + return a.reader_ != b.reader_; + } +}; // NOLINT + +template +ChannelIterator begin(ChannelObject* chan) { + ChannelIterator it{std::make_shared>(chan), T()}; + ++it; + return it; +} + +template +ChannelIterator end(ChannelObject* chan) { + return {nullptr, T()}; +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index e89f3f1a4e0..697a3839831 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -160,81 +160,82 @@ template class PrivateQueueDataFeed>; template InMemoryDataFeed::InMemoryDataFeed() { - cur_channel_ = 0; - shuffled_ins_ = std::make_shared>(); - shuffled_ins_out_ = std::make_shared>(); - fleet_send_batch_size_ = 80000; // hard code here - memory_data_ = nullptr; - mutex_for_update_memory_data_ = nullptr; this->file_idx_ = nullptr; this->mutex_for_pick_file_ = nullptr; - fleet_send_sleep_seconds_ = 2; + this->fp_ = nullptr; + this->thread_id_ = 0; + this->thread_num_ = 1; + this->input_channel_ = nullptr; + this->output_channel_ = nullptr; + this->consume_channel_ = nullptr; } template bool InMemoryDataFeed::Start() { #ifdef _LINUX - DataFeed::CheckSetFileList(); - if (shuffled_ins_->Size() == 0 && shuffled_ins_out_->Size() == 0) { - FillMemoryDataToChannel(); + this->CheckSetFileList(); + if (output_channel_->Size() == 0 && input_channel_->Size() != 0) { + std::vector data; + input_channel_->Read(data); + output_channel_->Write(std::move(data)); } #endif - DataFeed::finish_start_ = true; + this->finish_start_ = true; return true; } template int InMemoryDataFeed::Next() { #ifdef _LINUX - DataFeed::CheckStart(); - std::shared_ptr> in_channel = nullptr; - std::shared_ptr> out_channel = nullptr; - if (cur_channel_ == 0) { - in_channel = shuffled_ins_; - out_channel = shuffled_ins_out_; - } else { - in_channel = shuffled_ins_out_; - out_channel = shuffled_ins_; - } - CHECK(in_channel != nullptr); - CHECK(out_channel != nullptr); - VLOG(3) << "in_channel size=" << in_channel->Size() - << ", out_channel size=" << out_channel->Size() + this->CheckStart(); + CHECK(output_channel_ != nullptr); + CHECK(consume_channel_ != nullptr); + VLOG(3) << "output_channel_ size=" << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() << ", thread_id=" << thread_id_; int index = 0; T instance; - T ins_vec; - while (index < DataFeed::default_batch_size_) { - if (in_channel->Size() == 0) { + std::vector ins_vec; + ins_vec.reserve(this->default_batch_size_); + while (index < this->default_batch_size_) { + if (output_channel_->Size() == 0) { break; } - in_channel->Pop(&instance); - - AddInstanceToInsVec(&ins_vec, instance, index++); - out_channel->Push(std::move(instance)); + output_channel_->Get(instance); + ins_vec.push_back(instance); + ++index; + consume_channel_->Put(std::move(instance)); } - DataFeed::batch_size_ = index; - VLOG(3) << "batch_size_=" << DataFeed::batch_size_ + this->batch_size_ = index; + VLOG(3) << "batch_size_=" << this->batch_size_ << ", thread_id=" << thread_id_; - if (DataFeed::batch_size_ != 0) { + if (this->batch_size_ != 0) { PutToFeedVec(ins_vec); } else { - cur_channel_ = 1 - cur_channel_; + VLOG(3) << "finish reading, output_channel_ size=" + << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() + << ", thread_id=" << thread_id_; } - return DataFeed::batch_size_; + return this->batch_size_; #else return 0; #endif } template -void InMemoryDataFeed::SetMemoryData(void* memory_data) { - memory_data_ = static_cast*>(memory_data); +void InMemoryDataFeed::SetInputChannel(void* channel) { + input_channel_ = static_cast*>(channel); +} + +template +void InMemoryDataFeed::SetOutputChannel(void* channel) { + output_channel_ = static_cast*>(channel); } template -void InMemoryDataFeed::SetMemoryDataMutex(std::mutex* mutex) { - mutex_for_update_memory_data_ = mutex; +void InMemoryDataFeed::SetConsumeChannel(void* channel) { + consume_channel_ = static_cast*>(channel); } template @@ -247,213 +248,38 @@ void InMemoryDataFeed::SetThreadNum(int thread_num) { thread_num_ = thread_num; } -template -void InMemoryDataFeed::SetTrainerNum(int trainer_num) { - trainer_num_ = trainer_num; -} - -template -void InMemoryDataFeed::SetFleetSendBatchSize(int64_t size) { - fleet_send_batch_size_ = size; -} - -template -void InMemoryDataFeed::PutInsToChannel(const std::string& ins_str) { -#ifdef _LINUX - std::vector ins; - DeserializeIns(&ins, ins_str); - shuffled_ins_->Extend(std::move(ins)); - VLOG(3) << "PutInsToChannel put ins num=" << ins.size() - << " to channel, channel size=" << shuffled_ins_->Size() - << " thread_id=" << thread_id_; -#endif -} - -template -void InMemoryDataFeed::FillMemoryDataToChannel() { -#ifdef _LINUX - VLOG(3) << "FillMemoryDataToChannel, thread_id=" << thread_id_; - auto interval = GetMemoryDataInterval(); - VLOG(3) << "memory data size=" << memory_data_->size() - << ", fill data from [" << interval.first << ", " << interval.second - << "), thread_id=" << thread_id_; - for (int64_t i = interval.first; i < interval.second; ++i) { - T& t = (*memory_data_)[i]; - shuffled_ins_->Push(std::move(t)); - } -#endif -} - -template -void InMemoryDataFeed::FillChannelToMemoryData() { -#ifdef _LINUX - VLOG(3) << "FillChannelToMemoryData, thread_id=" << thread_id_; - std::vector local_vec; - std::shared_ptr> channel = nullptr; - std::shared_ptr> pre_channel = nullptr; - if (cur_channel_ == 0) { - channel = shuffled_ins_; - pre_channel = shuffled_ins_out_; - } else { - channel = shuffled_ins_out_; - pre_channel = shuffled_ins_; - } - CHECK(channel != nullptr); - CHECK(pre_channel != nullptr); - CHECK_EQ(pre_channel->Size(), 0); - local_vec.resize(channel->Size()); - for (int64_t i = 0; i < local_vec.size(); ++i) { - channel->Pop(&local_vec[i]); - } - VLOG(3) << "local_vec size=" << local_vec.size() - << ", thread_id=" << thread_id_; - { - std::lock_guard g(*mutex_for_update_memory_data_); - VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size() - << ", thread_id=" << thread_id_; - memory_data_->insert(memory_data_->end(), local_vec.begin(), - local_vec.end()); - VLOG(3) << "after insert memory_data_ size=" << memory_data_->size() - << ", thread_id=" << thread_id_; - } - std::vector().swap(local_vec); -#endif -} - template void InMemoryDataFeed::LoadIntoMemory() { #ifdef _LINUX VLOG(3) << "LoadIntoMemory() begin, thread_id=" << thread_id_; - std::vector local_vec; std::string filename; - while (DataFeed::PickOneFile(&filename)) { + while (this->PickOneFile(&filename)) { VLOG(3) << "PickOneFile, filename=" << filename << ", thread_id=" << thread_id_; int err_no = 0; - PrivateQueueDataFeed::fp_ = - fs_open_read(filename, &err_no, PrivateQueueDataFeed::pipe_command_); - CHECK(PrivateQueueDataFeed::fp_ != nullptr); - __fsetlocking(&*PrivateQueueDataFeed::fp_, FSETLOCKING_BYCALLER); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + paddle::framework::ChannelWriter writer(input_channel_); T instance; platform::Timer timeline; timeline.Start(); while (ParseOneInstanceFromPipe(&instance)) { - local_vec.push_back(instance); + writer << std::move(instance); + instance = T(); } + writer.Flush(); timeline.Pause(); VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; - { - std::lock_guard lock(*mutex_for_update_memory_data_); - timeline.Start(); - memory_data_->insert(memory_data_->end(), - std::make_move_iterator(local_vec.begin()), - std::make_move_iterator(local_vec.end())); - timeline.Pause(); - VLOG(3) << "LoadIntoMemory() memory_data insert, cost time=" - << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; - } - local_vec.clear(); } - std::vector().swap(local_vec); VLOG(3) << "LoadIntoMemory() end, thread_id=" << thread_id_; #endif } -template -void InMemoryDataFeed::LocalShuffle() { -#ifdef _LINUX - VLOG(3) << "LocalShuffle() begin, thread_id=" << thread_id_; - FillMemoryDataToChannel(); - VLOG(3) << "LocalShuffle() end, thread_id=" << thread_id_; -#endif -} - -template -void InMemoryDataFeed::GlobalShuffle() { -#ifdef _LINUX - VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_; - auto fleet_ptr = FleetWrapper::GetInstance(); - std::vector> send_vec(trainer_num_); - std::vector send_index(trainer_num_); - uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_ + 1; - for (auto& vec : send_vec) { - vec.reserve(reserve_len); - } - for (int i = 0; i < trainer_num_; ++i) { - send_index[i] = i; - } - std::vector> total_status; - auto interval = GetMemoryDataInterval(); - VLOG(3) << "global shuffle data from [" << interval.first << ", " - << interval.second << "), thread_id=" << thread_id_; - - for (int64_t i = interval.first; i < interval.second; - i += fleet_send_batch_size_) { - for (int64_t j = 0; j < fleet_send_batch_size_ && i + j < interval.second; - ++j) { - int64_t random_num = fleet_ptr->LocalRandomEngine()(); - int64_t node_id = random_num % trainer_num_; - send_vec[node_id].push_back(&((*memory_data_)[i + j])); - } - total_status.clear(); - std::shuffle(send_index.begin(), send_index.end(), - fleet_ptr->LocalRandomEngine()); - for (int index = 0; index < send_index.size(); ++index) { - int j = send_index[index]; - if (send_vec[j].size() == 0) { - continue; - } - std::string send_str; - SerializeIns(send_vec[j], &send_str); - auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str); - total_status.push_back(std::move(ret)); - send_vec[j].clear(); - } - for (auto& t : total_status) { - t.wait(); - } - sleep(fleet_send_sleep_seconds_); - } - VLOG(3) << "GlobalShuffle() end, thread_id=" << thread_id_; -#endif -} - -template -std::pair InMemoryDataFeed::GetMemoryDataInterval() { - int64_t start = 0; - int64_t end = 0; - int64_t size = memory_data_->size(); - for (int64_t i = 0; i <= static_cast(thread_id_); ++i) { - int64_t len = size / static_cast(thread_num_) + - (i < (size % static_cast(thread_num_))); - start = end; - end += len; - } - return std::make_pair(start, end); -} - -template -int64_t InMemoryDataFeed::GetChannelDataSize() { - if (cur_channel_ == 0) { - return shuffled_ins_->Size(); - } else { - return shuffled_ins_out_->Size(); - } -} - -template -void InMemoryDataFeed::ReleaseChannelData() { - if (cur_channel_ == 0) { - shuffled_ins_->Clear(); - } else { - shuffled_ins_out_->Clear(); - } -} - // explicit instantiation -template class InMemoryDataFeed>; +template class InMemoryDataFeed; void MultiSlotDataFeed::Init( const paddle::framework::DataFeedDesc& data_feed_desc) { @@ -807,7 +633,6 @@ void MultiSlotInMemoryDataFeed::Init( paddle::framework::MultiSlotDesc multi_slot_desc = data_feed_desc.multi_slot_desc(); SetBatchSize(data_feed_desc.batch_size()); - SetQueueSize(data_feed_desc.batch_size()); size_t all_slot_num = multi_slot_desc.slots_size(); all_slots_.resize(all_slot_num); all_slots_type_.resize(all_slot_num); @@ -848,17 +673,13 @@ void MultiSlotInMemoryDataFeed::Init( finish_init_ = true; } -bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( - std::vector* instance) { +bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { #ifdef _LINUX thread_local string::LineFileReader reader; if (!reader.getline(&*(fp_.get()))) { return false; } else { - int use_slots_num = use_slots_.size(); - instance->resize(use_slots_num); - const char* str = reader.get(); std::string line = std::string(str); // VLOG(3) << line; @@ -875,16 +696,27 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( "characters.\nplease check this error line: %s", str); if (idx != -1) { - (*instance)[idx].Init(all_slots_type_[i]); - if ((*instance)[idx].GetType()[0] == 'f') { // float + if (all_slots_type_[i][0] == 'f') { // float for (int j = 0; j < num; ++j) { float feasign = strtof(endptr, &endptr); - (*instance)[idx].AddValue(feasign); + // if float feasign is equal to zero, ignore it + if (fabs(feasign) < 1e-6) { + continue; + } + FeatureKey f; + f.float_feasign_ = feasign; + instance->float_feasigns_.push_back(FeatureItem(f, idx)); } - } else if ((*instance)[idx].GetType()[0] == 'u') { // uint64 + } else if (all_slots_type_[i][0] == 'u') { // uint64 for (int j = 0; j < num; ++j) { uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10); - (*instance)[idx].AddValue(feasign); + // if uint64 feasign is equal to zero, ignore it + if (feasign == 0) { + continue; + } + FeatureKey f; + f.uint64_feasign_ = feasign; + instance->uint64_feasigns_.push_back(FeatureItem(f, idx)); } } pos = endptr - str; @@ -897,6 +729,8 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( } } } + instance->float_feasigns_.shrink_to_fit(); + instance->uint64_feasigns_.shrink_to_fit(); return true; } #else @@ -904,13 +738,10 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( #endif } -bool MultiSlotInMemoryDataFeed::ParseOneInstance( - std::vector* instance) { +bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) { #ifdef _LINUX std::string line; if (getline(file_, line)) { - int use_slots_num = use_slots_.size(); - instance->resize(use_slots_num); VLOG(3) << line; // parse line const char* str = line.c_str(); @@ -928,16 +759,25 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( str); if (idx != -1) { - (*instance)[idx].Init(all_slots_type_[i]); - if ((*instance)[idx].GetType()[0] == 'f') { // float + if (all_slots_type_[i][0] == 'f') { // float for (int j = 0; j < num; ++j) { float feasign = strtof(endptr, &endptr); - (*instance)[idx].AddValue(feasign); + if (fabs(feasign) < 1e-6) { + continue; + } + FeatureKey f; + f.float_feasign_ = feasign; + instance->float_feasigns_.push_back(FeatureItem(f, idx)); } - } else if ((*instance)[idx].GetType()[0] == 'u') { // uint64 + } else if (all_slots_type_[i][0] == 'u') { // uint64 for (int j = 0; j < num; ++j) { uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10); - (*instance)[idx].AddValue(feasign); + if (feasign == 0) { + continue; + } + FeatureKey f; + f.uint64_feasign_ = feasign; + instance->uint64_feasigns_.push_back(FeatureItem(f, idx)); } } pos = endptr - str; @@ -947,6 +787,9 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( } } } + instance->float_feasigns_.shrink_to_fit(); + instance->uint64_feasigns_.shrink_to_fit(); + return true; } else { return false; } @@ -954,46 +797,64 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( return false; } -void MultiSlotInMemoryDataFeed::AddInstanceToInsVec( - std::vector* ins_vec, - const std::vector& instance, int index) { +void MultiSlotInMemoryDataFeed::PutToFeedVec( + const std::vector& ins_vec) { #ifdef _LINUX - if (index == 0) { - ins_vec->resize(instance.size()); - for (size_t i = 0; i < instance.size(); ++i) { - (*ins_vec)[i].Init(instance[i].GetType()); - (*ins_vec)[i].InitOffset(); + std::vector> batch_float_feasigns(use_slots_.size(), + std::vector()); + std::vector> batch_uint64_feasigns( + use_slots_.size(), std::vector()); + std::vector> offset(use_slots_.size(), + std::vector{0}); + std::vector visit(use_slots_.size(), false); + for (size_t i = 0; i < ins_vec.size(); ++i) { + auto& r = ins_vec[i]; + for (auto& item : r.float_feasigns_) { + batch_float_feasigns[item.slot()].push_back(item.sign().float_feasign_); + visit[item.slot()] = true; + } + for (auto& item : r.uint64_feasigns_) { + batch_uint64_feasigns[item.slot()].push_back(item.sign().uint64_feasign_); + visit[item.slot()] = true; + } + for (size_t j = 0; j < use_slots_.size(); ++j) { + const auto& type = all_slots_type_[j]; + if (visit[j]) { + visit[j] = false; + } else { + // fill slot value with default value 0 + if (type[0] == 'f') { // float + batch_float_feasigns[j].push_back(0.0); + } else if (type[0] == 'u') { // uint64 + batch_uint64_feasigns[j].push_back(0); + } + } + // get offset of this ins in this slot + if (type[0] == 'f') { // float + offset[j].push_back(batch_float_feasigns[j].size()); + } else if (type[0] == 'u') { // uint64 + offset[j].push_back(batch_uint64_feasigns[j].size()); + } } } - for (size_t i = 0; i < instance.size(); ++i) { - (*ins_vec)[i].AddIns(instance[i]); - } -#endif -} - -void MultiSlotInMemoryDataFeed::PutToFeedVec( - const std::vector& ins_vec) { -#ifdef _LINUX for (size_t i = 0; i < use_slots_.size(); ++i) { - const auto& type = ins_vec[i].GetType(); - const auto& offset = ins_vec[i].GetOffset(); - int total_instance = static_cast(offset.back()); - + int total_instance = offset[i].back(); + const auto& type = all_slots_type_[i]; if (type[0] == 'f') { // float - const auto& feasign = ins_vec[i].GetFloatData(); + float* feasign = batch_float_feasigns[i].data(); float* tensor_ptr = feed_vec_[i]->mutable_data( {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); + memcpy(tensor_ptr, feasign, total_instance * sizeof(float)); } else if (type[0] == 'u') { // uint64 // no uint64_t type in paddlepaddle - const auto& feasign = ins_vec[i].GetUint64Data(); + uint64_t* feasign = batch_uint64_feasigns[i].data(); int64_t* tensor_ptr = feed_vec_[i]->mutable_data( {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); + memcpy(tensor_ptr, feasign, total_instance * sizeof(int64_t)); } - - LoD data_lod{offset}; + auto& slot_offset = offset[i]; + LoD data_lod{slot_offset}; feed_vec_[i]->set_lod(data_lod); if (use_slots_is_dense_[i]) { if (inductive_shape_index_[i] != -1) { @@ -1006,19 +867,6 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( #endif } -// todo serialize ins in global shuffle -void MultiSlotInMemoryDataFeed::SerializeIns( - const std::vector*>& ins, std::string* str) { - auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->Serialize(ins, str); -} -// todo deserialize ins in global shuffle -void MultiSlotInMemoryDataFeed::DeserializeIns( - std::vector>* ins, const std::string& str) { - auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->Deserialize(ins, str); -} - #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) template void PrivateInstantDataFeed::PutToFeedVec() { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 7fea85601c4..93f14dd74f9 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -14,6 +14,11 @@ limitations under the License. */ #pragma once +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + #include #include // NOLINT #include @@ -24,7 +29,9 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/archive.h" #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -88,17 +95,15 @@ class DataFeed { virtual void AssignFeedVar(const Scope& scope); // This function will do nothing at default - virtual void SetMemoryData(void* memory_data) {} + virtual void SetInputChannel(void* channel) {} + // This function will do nothing at default + virtual void SetOutputChannel(void* channel) {} // This function will do nothing at default - virtual void SetMemoryDataMutex(std::mutex* mutex) {} + virtual void SetConsumeChannel(void* channel) {} // This function will do nothing at default virtual void SetThreadId(int thread_id) {} // This function will do nothing at default virtual void SetThreadNum(int thread_num) {} - // This function will do nothing at default - virtual void SetTrainerNum(int trainer_num) {} - // This function will do nothing at default - virtual void SetFleetSendBatchSize(int64_t size) {} virtual void SetFileListMutex(std::mutex* mutex) { mutex_for_pick_file_ = mutex; } @@ -106,21 +111,6 @@ class DataFeed { virtual void LoadIntoMemory() { PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); } - virtual void LocalShuffle() { - PADDLE_THROW("This function(LocalShuffle) is not implemented."); - } - virtual void GlobalShuffle() { - PADDLE_THROW("This function(GlobalShuffle) is not implemented."); - } - // This function will do nothing at default - virtual void FillMemoryDataToChannel() {} - // This function will do nothing at default - virtual void FillChannelToMemoryData() {} - // This function will do nothing at default - virtual void PutInsToChannel(const std::string& ins_str) {} - virtual int64_t GetChannelDataSize() { return 0; } - // This function will do nothing at default - virtual void ReleaseChannelData() {} protected: // The following three functions are used to check if it is executed in this @@ -212,54 +202,32 @@ class PrivateQueueDataFeed : public DataFeed { }; template -class InMemoryDataFeed : public PrivateQueueDataFeed { +class InMemoryDataFeed : public DataFeed { public: InMemoryDataFeed(); virtual ~InMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc) = 0; virtual bool Start(); virtual int Next(); - virtual void SetMemoryData(void* memory_data); - virtual void SetMemoryDataMutex(std::mutex* mutex); + virtual void SetInputChannel(void* channel); + virtual void SetOutputChannel(void* channel); + virtual void SetConsumeChannel(void* channel); virtual void SetThreadId(int thread_id); virtual void SetThreadNum(int thread_num); - virtual void SetTrainerNum(int trainer_num); - virtual void SetFleetSendBatchSize(int64_t size); - virtual void PutInsToChannel(const std::string& ins_str); - virtual void FillMemoryDataToChannel(); - virtual void FillChannelToMemoryData(); virtual void LoadIntoMemory(); - virtual void LocalShuffle(); - virtual void GlobalShuffle(); - virtual int64_t GetChannelDataSize(); - virtual void ReleaseChannelData(); protected: - virtual void AddInstanceToInsVec(T* vec_ins, const T& instance, - int index) = 0; virtual bool ParseOneInstance(T* instance) = 0; virtual bool ParseOneInstanceFromPipe(T* instance) = 0; - virtual void PutToFeedVec(const T& ins_vec) = 0; - virtual void SerializeIns(const std::vector& ins, std::string* str) = 0; - virtual void DeserializeIns(std::vector* ins, const std::string& str) = 0; - virtual std::pair GetMemoryDataInterval(); + virtual void PutToFeedVec(const std::vector& ins_vec) = 0; int thread_id_; int thread_num_; - int trainer_num_; - uint32_t rand_seed; - std::vector* memory_data_; - std::mutex* mutex_for_update_memory_data_; - // when read ins, we put ins from one channel to the other, - // and when finish reading, we set cur_channel = 1 - cur_channel, - // so if cur_channel=0, all data are in shuffled_ins_, else shuffled_ins_out_ - int cur_channel_; - std::shared_ptr> shuffled_ins_; - std::shared_ptr> shuffled_ins_out_; - int64_t fleet_send_batch_size_; - // sleep after send is to slow down sending data, but it's trick, - // should be removed later. - int64_t fleet_send_sleep_seconds_; + std::ifstream file_; + std::shared_ptr fp_; + paddle::framework::ChannelObject* input_channel_; + paddle::framework::ChannelObject* output_channel_; + paddle::framework::ChannelObject* consume_channel_; }; // This class define the data type of instance(ins_vec) in MultiSlotDataFeed @@ -381,6 +349,126 @@ class MultiSlotType { std::vector offset_; }; +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const MultiSlotType& ins) { + ar << ins.GetType(); +#ifdef _LINUX + ar << ins.GetOffset(); +#else + const auto& offset = ins.GetOffset(); + ar << (uint64_t)offset.size(); + for (const size_t& x : offset) { + ar << (const uint64_t)x; + } +#endif + ar << ins.GetFloatData(); + ar << ins.GetUint64Data(); + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + MultiSlotType& ins) { + ar >> ins.MutableType(); +#ifdef _LINUX + ar >> ins.MutableOffset(); +#else + auto& offset = ins.MutableOffset(); + offset.resize(ar.template Get()); + for (size_t& x : offset) { + uint64_t t; + ar >> t; + x = (size_t)t; + } +#endif + ar >> ins.MutableFloatData(); + ar >> ins.MutableUint64Data(); + return ar; +} + +union FeatureKey { + uint64_t uint64_feasign_; + float float_feasign_; +}; + +struct FeatureItem { + FeatureItem() {} + FeatureItem(FeatureKey sign, uint16_t slot) { + this->sign() = sign; + this->slot() = slot; + } + FeatureKey& sign() { return *(reinterpret_cast(sign_buffer())); } + const FeatureKey& sign() const { + const FeatureKey* ret = reinterpret_cast(sign_buffer()); + return *ret; + } + uint16_t& slot() { return slot_; } + const uint16_t& slot() const { return slot_; } + + private: + char* sign_buffer() const { return const_cast(sign_); } + char sign_[sizeof(FeatureKey)]; + uint16_t slot_; +}; + +// sizeof Record is much less than std::vector +struct Record { + std::vector uint64_feasigns_; + std::vector float_feasigns_; + std::string ins_id_; +}; + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const FeatureKey& fk) { + ar << fk.uint64_feasign_; + ar << fk.float_feasign_; + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + FeatureKey& fk) { + ar >> fk.uint64_feasign_; + ar >> fk.float_feasign_; + return ar; +} + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const FeatureItem& fi) { + ar << fi.sign(); + ar << fi.slot(); + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + FeatureItem& fi) { + ar >> fi.sign(); + ar >> fi.slot(); + return ar; +} + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const Record& r) { + ar << r.uint64_feasigns_; + ar << r.float_feasigns_; + ar << r.ins_id_; + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + Record& r) { + ar >> r.uint64_feasigns_; + ar >> r.float_feasigns_; + ar >> r.ins_id_; + return ar; +} + // This DataFeed is used to feed multi-slot type data. // The format of multi-slot type data: // [n feasign_0 feasign_1 ... feasign_n]* @@ -391,7 +479,6 @@ class MultiSlotDataFeed virtual ~MultiSlotDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); virtual bool CheckFile(const char* filename); - // virtual void ReadThread(); protected: virtual void ReadThread(); @@ -403,24 +490,16 @@ class MultiSlotDataFeed virtual void PutToFeedVec(const std::vector& ins_vec); }; -class MultiSlotInMemoryDataFeed - : public InMemoryDataFeed> { +class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { public: MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); protected: - virtual void AddInstanceToInsVec(std::vector* vec_ins, - const std::vector& instance, - int index); - virtual bool ParseOneInstance(std::vector* instance); - virtual bool ParseOneInstanceFromPipe(std::vector* instance); - virtual void PutToFeedVec(const std::vector& ins_vec); - virtual void SerializeIns(const std::vector*>& ins, - std::string* str); - virtual void DeserializeIns(std::vector>* ins, - const std::string& str); + virtual bool ParseOneInstance(Record* instance); + virtual bool ParseOneInstanceFromPipe(Record* instance); + virtual void PutToFeedVec(const std::vector& ins_vec); }; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 1b3edeed103..119794e592c 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -32,9 +32,14 @@ namespace framework { // constructor template DatasetImpl::DatasetImpl() { + VLOG(3) << "DatasetImpl::DatasetImpl() constructor"; thread_num_ = 1; trainer_num_ = 1; + channel_num_ = 1; file_idx_ = 0; + cur_channel_ = 0; + fleet_send_batch_size_ = 80000; + fleet_send_sleep_seconds_ = 2; } // set filelist, file_idx_ will reset to zero. @@ -58,10 +63,6 @@ void DatasetImpl::SetThreadNum(int thread_num) { template void DatasetImpl::SetTrainerNum(int trainer_num) { trainer_num_ = trainer_num; - // should inform reader of trainer_num directly - for (auto reader : readers_) { - reader->SetTrainerNum(trainer_num); - } } // if you run distributed, and want to do global shuffle, @@ -70,9 +71,6 @@ void DatasetImpl::SetTrainerNum(int trainer_num) { template void DatasetImpl::SetFleetSendBatchSize(int64_t size) { fleet_send_batch_size_ = size; - for (auto reader : readers_) { - reader->SetFleetSendBatchSize(size); - } } template @@ -92,12 +90,38 @@ void DatasetImpl::SetDataFeedDesc(const std::string& data_feed_desc_str) { &data_feed_desc_); } -// readers_.size() may not be equal to thread_num_, -// it changes when filelist_.size() < thread_num_ template -std::vector>& -DatasetImpl::GetReaders() { - return readers_; +void DatasetImpl::SetChannelNum(int channel_num) { + channel_num_ = channel_num; +} + +template +std::vector DatasetImpl::GetReaders() { + std::vector ret; + ret.reserve(readers_.size()); + for (auto i : readers_) { + ret.push_back(i.get()); + } + return ret; +} + +template +void DatasetImpl::CreateChannel() { + if (input_channel_ == nullptr) { + input_channel_ = paddle::framework::MakeChannel(); + } + if (multi_output_channel_.size() == 0) { + multi_output_channel_.reserve(channel_num_); + for (int i = 0; i < channel_num_; ++i) { + multi_output_channel_.push_back(paddle::framework::MakeChannel()); + } + } + if (multi_consume_channel_.size() == 0) { + multi_consume_channel_.reserve(channel_num_); + for (int i = 0; i < channel_num_; ++i) { + multi_consume_channel_.push_back(paddle::framework::MakeChannel()); + } + } } // if sent message between workers, should first call this function @@ -119,9 +143,6 @@ void DatasetImpl::LoadIntoMemory() { VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } std::vector load_threads; for (int64_t i = 0; i < thread_num_; ++i) { load_threads.push_back(std::thread( @@ -130,20 +151,63 @@ void DatasetImpl::LoadIntoMemory() { for (std::thread& t : load_threads) { t.join(); } + input_channel_->Close(); + int64_t in_chan_size = input_channel_->Size(); + input_channel_->SetBlockSize(in_chan_size / thread_num_ + 1); timeline.Pause(); VLOG(3) << "DatasetImpl::LoadIntoMemory() end" - << ", memory data size=" << memory_data_.size() + << ", memory data size=" << input_channel_->Size() << ", cost time=" << timeline.ElapsedSec() << " seconds"; } +template +void DatasetImpl::PreLoadIntoMemory() { + VLOG(3) << "DatasetImpl::PreLoadIntoMemory() begin"; + preload_threads_.clear(); + for (int64_t i = 0; i < thread_num_; ++i) { + preload_threads_.push_back(std::thread( + &paddle::framework::DataFeed::LoadIntoMemory, readers_[i].get())); + } + VLOG(3) << "DatasetImpl::PreLoadIntoMemory() end"; +} + +template +void DatasetImpl::WaitPreLoadDone() { + VLOG(3) << "DatasetImpl::WaitPreLoadDone() begin"; + for (std::thread& t : preload_threads_) { + t.join(); + } + input_channel_->Close(); + int64_t in_chan_size = input_channel_->Size(); + input_channel_->SetBlockSize(in_chan_size / thread_num_ + 1); + VLOG(3) << "DatasetImpl::WaitPreLoadDone() end"; +} + // release memory data template void DatasetImpl::ReleaseMemory() { VLOG(3) << "DatasetImpl::ReleaseMemory() begin"; - std::vector().swap(memory_data_); - for (int i = 0; i < readers_.size(); ++i) { - readers_[i]->ReleaseChannelData(); + if (input_channel_) { + input_channel_->Clear(); + input_channel_ = nullptr; + } + for (size_t i = 0; i < multi_output_channel_.size(); ++i) { + if (!multi_output_channel_[i]) { + continue; + } + multi_output_channel_[i]->Clear(); + multi_output_channel_[i] = nullptr; } + std::vector>().swap(multi_output_channel_); + for (size_t i = 0; i < multi_consume_channel_.size(); ++i) { + if (!multi_consume_channel_[i]) { + continue; + } + multi_consume_channel_[i]->Clear(); + multi_consume_channel_[i] = nullptr; + } + std::vector>().swap(multi_consume_channel_); + std::vector>().swap(readers_); VLOG(3) << "DatasetImpl::ReleaseMemory() end"; } @@ -153,21 +217,22 @@ void DatasetImpl::LocalShuffle() { VLOG(3) << "DatasetImpl::LocalShuffle() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } - // if it is not InMemory, memory_data_ is empty - std::random_shuffle(memory_data_.begin(), memory_data_.end()); - std::vector local_shuffle_threads; - for (int64_t i = 0; i < thread_num_; ++i) { - local_shuffle_threads.push_back(std::thread( - &paddle::framework::DataFeed::LocalShuffle, readers_[i].get())); - } - for (std::thread& t : local_shuffle_threads) { - t.join(); + if (!input_channel_ || input_channel_->Size() == 0) { + VLOG(3) << "DatasetImpl::LocalShuffle() end, no data to shuffle"; + return; } - std::vector().swap(memory_data_); + auto fleet_ptr = FleetWrapper::GetInstance(); + input_channel_->Close(); + std::vector data; + input_channel_->ReadAll(data); + std::shuffle(data.begin(), data.end(), fleet_ptr->LocalRandomEngine()); + input_channel_->Open(); + input_channel_->Write(std::move(data)); + data.clear(); + data.shrink_to_fit(); + input_channel_->Close(); + timeline.Pause(); VLOG(3) << "DatasetImpl::LocalShuffle() end, cost time=" << timeline.ElapsedSec() << " seconds"; @@ -178,23 +243,75 @@ void DatasetImpl::GlobalShuffle() { VLOG(3) << "DatasetImpl::GlobalShuffle() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } auto fleet_ptr = FleetWrapper::GetInstance(); - // local shuffle all data before global shuffle - std::shuffle(memory_data_.begin(), memory_data_.end(), - fleet_ptr->LocalRandomEngine()); + + if (!input_channel_ || input_channel_->Size() == 0) { + VLOG(3) << "DatasetImpl::GlobalShuffle() end, no data to shuffle"; + return; + } + + // local shuffle + input_channel_->Close(); + std::vector data; + input_channel_->ReadAll(data); + std::shuffle(data.begin(), data.end(), fleet_ptr->LocalRandomEngine()); + input_channel_->Open(); + input_channel_->Write(std::move(data)); + data.clear(); + data.shrink_to_fit(); + + input_channel_->Close(); + input_channel_->SetBlockSize(fleet_send_batch_size_); + VLOG(3) << "DatasetImpl::GlobalShuffle() input_channel_ size " + << input_channel_->Size(); + + auto global_shuffle_func = [this]() { + auto fleet_ptr = FleetWrapper::GetInstance(); + std::vector data; + while (this->input_channel_->Read(data)) { + std::vector ars(this->trainer_num_); + for (auto& t : data) { + auto client_id = fleet_ptr->LocalRandomEngine()() % this->trainer_num_; + ars[client_id] << t; + } + std::vector> total_status; + std::vector send_index(this->trainer_num_); + for (int i = 0; i < this->trainer_num_; ++i) { + send_index[i] = i; + } + std::shuffle(send_index.begin(), send_index.end(), + fleet_ptr->LocalRandomEngine()); + for (auto index = 0u; index < this->trainer_num_; ++index) { + int i = send_index[index]; + if (ars[i].Length() == 0) { + continue; + } + std::string msg(ars[i].Buffer(), ars[i].Length()); + auto ret = fleet_ptr->SendClientToClientMsg(0, i, msg); + total_status.push_back(std::move(ret)); + } + for (auto& t : total_status) { + t.wait(); + } + ars.clear(); + ars.shrink_to_fit(); + data.clear(); + data.shrink_to_fit(); + sleep(this->fleet_send_sleep_seconds_); + } + }; + VLOG(3) << "start global shuffle threads"; std::vector global_shuffle_threads; for (int i = 0; i < thread_num_; ++i) { - global_shuffle_threads.push_back(std::thread( - &paddle::framework::DataFeed::GlobalShuffle, readers_[i].get())); + global_shuffle_threads.push_back(std::thread(global_shuffle_func)); } for (std::thread& t : global_shuffle_threads) { t.join(); } - std::vector().swap(memory_data_); + global_shuffle_threads.clear(); + global_shuffle_threads.shrink_to_fit(); + input_channel_->Clear(); timeline.Pause(); VLOG(3) << "DatasetImpl::GlobalShuffle() end, cost time=" << timeline.ElapsedSec() << " seconds"; @@ -203,78 +320,67 @@ void DatasetImpl::GlobalShuffle() { template void DatasetImpl::CreateReaders() { VLOG(3) << "Calling CreateReaders()"; - CHECK(thread_num_ > 0) << "thread_num should > 0"; - int file_cnt = filelist_.size(); - int memory_data_size = memory_data_.size(); - if (memory_data_size != 0 && thread_num_ > memory_data_size) { - VLOG(3) << "Dataset thread num = " << thread_num_ - << ", memory data size = " << memory_data_size - << ". Changing Dataset thread num = " << memory_data_size; - thread_num_ = memory_data_size; - } else if (file_cnt != 0 && thread_num_ > file_cnt) { - VLOG(3) << "Dataset thread num = " << thread_num_ - << ", file num = " << file_cnt - << ". Changing Dataset thread num = " << file_cnt; - thread_num_ = file_cnt; - } - VLOG(3) << "thread_num in Readers: " << thread_num_; + VLOG(3) << "thread num in Dataset: " << thread_num_; + VLOG(3) << "Filelist size in Dataset: " << filelist_.size(); + VLOG(3) << "channel num in Dataset: " << channel_num_; + CHECK(thread_num_ > 0) << "thread num should > 0"; + CHECK(thread_num_ <= filelist_.size()) + << "thread num should <= filelist size"; + CHECK(channel_num_ > 0) << "channel num should > 0"; + CHECK(channel_num_ <= thread_num_) << "channel num should <= thread num"; VLOG(3) << "readers size: " << readers_.size(); - VLOG(3) << "Filelist size in readers: " << filelist_.size(); if (readers_.size() != 0) { + VLOG(3) << "readers_.size() = " << readers_.size() + << ", will not create again"; return; } VLOG(3) << "data feed class name: " << data_feed_desc_.name(); + int channel_idx = 0; for (int i = 0; i < thread_num_; ++i) { readers_.push_back(DataFeedFactory::CreateDataFeed(data_feed_desc_.name())); - readers_.back()->Init(data_feed_desc_); - readers_.back()->SetMemoryData(&memory_data_); - readers_.back()->SetMemoryDataMutex(&mutex_for_update_memory_data_); - readers_.back()->SetThreadId(i); - readers_.back()->SetThreadNum(thread_num_); - readers_.back()->SetTrainerNum(trainer_num_); - readers_.back()->SetFileListMutex(&mutex_for_pick_file_); - readers_.back()->SetFileListIndex(&file_idx_); - readers_.back()->SetFileList(filelist_); + readers_[i]->Init(data_feed_desc_); + readers_[i]->SetThreadId(i); + readers_[i]->SetThreadNum(thread_num_); + readers_[i]->SetFileListMutex(&mutex_for_pick_file_); + readers_[i]->SetFileListIndex(&file_idx_); + readers_[i]->SetFileList(filelist_); + if (input_channel_ != nullptr) { + readers_[i]->SetInputChannel(input_channel_.get()); + } + if (cur_channel_ == 0 && channel_idx < multi_output_channel_.size()) { + readers_[i]->SetOutputChannel(multi_output_channel_[channel_idx].get()); + readers_[i]->SetConsumeChannel(multi_consume_channel_[channel_idx].get()); + } else if (channel_idx < multi_output_channel_.size()) { + readers_[i]->SetOutputChannel(multi_consume_channel_[channel_idx].get()); + readers_[i]->SetConsumeChannel(multi_output_channel_[channel_idx].get()); + } + ++channel_idx; + if (channel_idx >= channel_num_) { + channel_idx = 0; + } } + VLOG(3) << "readers size: " << readers_.size(); } template void DatasetImpl::DestroyReaders() { VLOG(3) << "Calling DestroyReaders()"; - // clear memory_data_ before fill it - // because if LoadIntoMemory but no Shuffle, - // memory_data_ has empty data which has been std::move to channel - if (memory_data_.size() != 0) { - std::vector().swap(memory_data_); - } - std::vector fill_threads; - for (int i = 0; i < thread_num_; ++i) { - fill_threads.push_back( - std::thread(&paddle::framework::DataFeed::FillChannelToMemoryData, - readers_[i].get())); - } - for (std::thread& t : fill_threads) { - t.join(); - } std::vector>().swap(readers_); VLOG(3) << "readers size: " << readers_.size(); - // if memory_data_ is empty, which means it's not InMemory mode, - // so the next epoch should read all data again - if (memory_data_.size() == 0) { - file_idx_ = 0; - } + file_idx_ = 0; + cur_channel_ = 1 - cur_channel_; } template int64_t DatasetImpl::GetMemoryDataSize() { - return memory_data_.size(); + return input_channel_->Size(); } template int64_t DatasetImpl::GetShuffleDataSize() { int64_t sum = 0; - for (int i = 0; i < readers_.size(); ++i) { - sum += readers_[i]->GetChannelDataSize(); + for (size_t i = 0; i < multi_output_channel_.size(); ++i) { + sum += multi_output_channel_[i]->Size() + multi_consume_channel_[i]->Size(); } return sum; } @@ -285,16 +391,34 @@ int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, #ifdef _LINUX VLOG(3) << "ReceiveFromClient msg_type=" << msg_type << ", client_id=" << client_id << ", msg length=" << msg.length(); + if (msg.length() == 0) { + return 0; + } + paddle::framework::BinaryArchive ar; + ar.SetReadBuffer(const_cast(msg.c_str()), msg.length(), nullptr); + if (ar.Cursor() == ar.Finish()) { + return 0; + } + std::vector data; + while (ar.Cursor() < ar.Finish()) { + data.push_back(ar.Get()); + } + CHECK(ar.Cursor() == ar.Finish()); + auto fleet_ptr = FleetWrapper::GetInstance(); - int64_t index = fleet_ptr->LocalRandomEngine()() % thread_num_; + int64_t index = fleet_ptr->LocalRandomEngine()() % channel_num_; VLOG(3) << "ramdom index=" << index; - readers_[index]->PutInsToChannel(msg); + multi_output_channel_[index]->Write(std::move(data)); + + data.clear(); + data.shrink_to_fit(); #endif return 0; } // explicit instantiation template class DatasetImpl>; +template class DatasetImpl; } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index ffbc7bfd95b..bb17b25c5a8 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -55,6 +55,8 @@ class Dataset { // set data fedd desc, which contains: // data feed name, batch size, slots virtual void SetDataFeedDesc(const std::string& data_feed_desc_str) = 0; + // set channel num + virtual void SetChannelNum(int channel_num) = 0; // get file list virtual const std::vector& GetFileList() = 0; // get thread num @@ -67,14 +69,21 @@ class Dataset { virtual std::pair GetHdfsConfig() = 0; // get data fedd desc virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() = 0; + // get channel num + virtual int GetChannelNum() = 0; // get readers, the reader num depend both on thread num // and filelist size - virtual std::vector>& - GetReaders() = 0; + virtual std::vector GetReaders() = 0; + // create input channel and output channel + virtual void CreateChannel() = 0; // register message handler between workers virtual void RegisterClientToClientMsgHandler() = 0; // load all data into memory virtual void LoadIntoMemory() = 0; + // load all data into memory in async mode + virtual void PreLoadIntoMemory() = 0; + // wait async load done + virtual void WaitPreLoadDone() = 0; // release all memory data virtual void ReleaseMemory() = 0; // local shuffle data @@ -110,6 +119,7 @@ class DatasetImpl : public Dataset { virtual void SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi); virtual void SetDataFeedDesc(const std::string& data_feed_desc_str); + virtual void SetChannelNum(int channel_num); virtual const std::vector& GetFileList() { return filelist_; } virtual int GetThreadNum() { return thread_num_; } @@ -121,11 +131,13 @@ class DatasetImpl : public Dataset { virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() { return data_feed_desc_; } - virtual std::vector>& - GetReaders(); - + virtual int GetChannelNum() { return channel_num_; } + virtual std::vector GetReaders(); + virtual void CreateChannel(); virtual void RegisterClientToClientMsgHandler(); virtual void LoadIntoMemory(); + virtual void PreLoadIntoMemory(); + virtual void WaitPreLoadDone(); virtual void ReleaseMemory(); virtual void LocalShuffle(); virtual void GlobalShuffle(); @@ -138,8 +150,14 @@ class DatasetImpl : public Dataset { virtual int ReceiveFromClient(int msg_type, int client_id, const std::string& msg); std::vector> readers_; - std::vector memory_data_; - std::mutex mutex_for_update_memory_data_; + paddle::framework::Channel input_channel_; + int channel_num_; + std::vector> multi_output_channel_; + std::vector> multi_consume_channel_; + // when read ins, we put ins from one channel to the other, + // and when finish reading, we set cur_channel = 1 - cur_channel, + // so if cur_channel=0, all data are in output_channel, else consume_channel + int cur_channel_; int thread_num_; paddle::framework::DataFeedDesc data_feed_desc_; int trainer_num_; @@ -148,12 +166,13 @@ class DatasetImpl : public Dataset { std::mutex mutex_for_pick_file_; std::string fs_name_; std::string fs_ugi_; - unsigned int rand_seed; int64_t fleet_send_batch_size_; + int64_t fleet_send_sleep_seconds_; + std::vector preload_threads_; }; // use std::vector as data type -class MultiSlotDataset : public DatasetImpl> { +class MultiSlotDataset : public DatasetImpl { public: MultiSlotDataset() {} virtual ~MultiSlotDataset() {} diff --git a/paddle/fluid/framework/dataset_factory.cc b/paddle/fluid/framework/dataset_factory.cc index 60be4cf9a43..3a28c101d48 100644 --- a/paddle/fluid/framework/dataset_factory.cc +++ b/paddle/fluid/framework/dataset_factory.cc @@ -21,14 +21,14 @@ limitations under the License. */ namespace paddle { namespace framework { -typedef std::shared_ptr (*CreateDatasetFunction)(); +typedef std::unique_ptr (*CreateDatasetFunction)(); typedef std::unordered_map datasetMap; datasetMap g_dataset_map; #define REGISTER_DATASET_CLASS(dataset_class) \ namespace { \ - std::shared_ptr Creator_##dataset_class() { \ - return std::shared_ptr(new dataset_class); \ + std::unique_ptr Creator_##dataset_class() { \ + return std::unique_ptr(new dataset_class); \ } \ class __Registerer_##dataset_class { \ public: \ @@ -50,7 +50,7 @@ std::string DatasetFactory::DatasetTypeList() { return dataset_types; } -std::shared_ptr DatasetFactory::CreateDataset( +std::unique_ptr DatasetFactory::CreateDataset( std::string dataset_class) { if (g_dataset_map.count(dataset_class) < 1) { LOG(WARNING) << "Your Dataset " << dataset_class diff --git a/paddle/fluid/framework/dataset_factory.h b/paddle/fluid/framework/dataset_factory.h index 2894b69f8fa..d4a36cec22f 100644 --- a/paddle/fluid/framework/dataset_factory.h +++ b/paddle/fluid/framework/dataset_factory.h @@ -23,7 +23,7 @@ namespace framework { class DatasetFactory { public: static std::string DatasetTypeList(); - static std::shared_ptr CreateDataset(std::string dataset_class); + static std::unique_ptr CreateDataset(std::string dataset_class); }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/device_worker.cc b/paddle/fluid/framework/device_worker.cc index 443acf0a163..7fe60b4446a 100644 --- a/paddle/fluid/framework/device_worker.cc +++ b/paddle/fluid/framework/device_worker.cc @@ -19,7 +19,7 @@ namespace framework { void DeviceWorker::SetRootScope(Scope* root_scope) { root_scope_ = root_scope; } -void DeviceWorker::SetDataFeed(const std::shared_ptr& data_feed) { +void DeviceWorker::SetDataFeed(DataFeed* data_feed) { device_reader_ = data_feed; } diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index be5f663e1c9..035392ef318 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -113,7 +113,7 @@ class DeviceWorker { // will make this zero copy in the future virtual void BindingDataFeedMemory() = 0; virtual void SetRootScope(Scope* root_scope); - virtual void SetDataFeed(const std::shared_ptr& data_feed); + virtual void SetDataFeed(DataFeed* data_feed); virtual void SetPlace(const paddle::platform::Place& place) { place_ = place; } @@ -121,7 +121,7 @@ class DeviceWorker { protected: Scope* root_scope_; paddle::platform::Place place_; - std::shared_ptr device_reader_; + DataFeed* device_reader_; int64_t batch_num_; FetchConfig fetch_config_; bool use_cvm_; diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 481e12fcd63..8cd0789c0ae 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -27,8 +27,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); - dataset->CreateReaders(); - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); thread_num_ = readers.size(); @@ -72,7 +71,6 @@ void DistMultiTrainer::Finalize() { th.join(); } pull_dense_worker_->Stop(); - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/expect.h b/paddle/fluid/framework/expect.h new file mode 100644 index 00000000000..146f4de9382 --- /dev/null +++ b/paddle/fluid/framework/expect.h @@ -0,0 +1,32 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#ifdef _LINUX +#ifndef likely +#define likely(x) __builtin_expect((x), 1) +#endif +#endif + +#ifdef _LINUX +#ifndef unlikely +#define unlikely(x) __builtin_expect((x), 0) +#endif +#endif diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index 3a266e4bda9..8cbf2efa81a 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -26,9 +26,7 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); // get filelist from trainer_desc here - dataset->CreateReaders(); - VLOG(3) << "readers created"; - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); VLOG(3) << "readers num: " << readers.size(); // change thread num to readers num @@ -75,7 +73,6 @@ void MultiTrainer::Finalize() { for (auto& th : threads_) { th.join(); } - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 3edffd434a1..916359ab6b1 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -28,9 +28,7 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, SetDataset(dataset); // get filelist from trainer_desc here - dataset->CreateReaders(); - VLOG(3) << "readers created"; - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); VLOG(3) << "readers num: " << readers.size(); @@ -259,7 +257,6 @@ void PipelineTrainer::Finalize() { pipeline_scopes_[0]->FindVar(var)->Get(); TensorCopySync(thread_tensor, platform::CPUPlace(), root_tensor); } - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index b491725974c..5fe296ff20d 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -74,7 +74,7 @@ class MultiTrainer : public TrainerBase { protected: int thread_num_; std::vector threads_; - std::vector> readers_; + std::vector readers_; std::vector> workers_; }; @@ -136,7 +136,7 @@ class PipelineTrainer : public TrainerBase { std::vector> sync_functors_; std::shared_ptr nccl_ctx_map_; - std::vector> readers_; + std::vector readers_; void InitFirstScopeQueue(ScopeQueue* scope_queue, int pipeline_id, const ProgramDesc& main_program); diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 3e2c976076a..dd94ac500cf 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -42,33 +42,64 @@ namespace paddle { namespace pybind { void BindDataset(py::module* m) { - py::class_>(*m, + py::class_>(*m, "Dataset") .def(py::init([](const std::string& name = "MultiSlotDataset") { return framework::DatasetFactory::CreateDataset(name); })) - .def("set_filelist", &framework::Dataset::SetFileList) - .def("set_thread_num", &framework::Dataset::SetThreadNum) - .def("set_trainer_num", &framework::Dataset::SetTrainerNum) + .def("set_filelist", &framework::Dataset::SetFileList, + py::call_guard()) + .def("set_thread_num", &framework::Dataset::SetThreadNum, + py::call_guard()) + .def("set_trainer_num", &framework::Dataset::SetTrainerNum, + py::call_guard()) .def("set_fleet_send_batch_size", - &framework::Dataset::SetFleetSendBatchSize) - .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig) - .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) - .def("get_filelist", &framework::Dataset::GetFileList) - .def("get_thread_num", &framework::Dataset::GetThreadNum) - .def("get_trainer_num", &framework::Dataset::GetTrainerNum) + &framework::Dataset::SetFleetSendBatchSize, + py::call_guard()) + .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig, + py::call_guard()) + .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc, + py::call_guard()) + .def("get_filelist", &framework::Dataset::GetFileList, + py::call_guard()) + .def("get_thread_num", &framework::Dataset::GetThreadNum, + py::call_guard()) + .def("get_trainer_num", &framework::Dataset::GetTrainerNum, + py::call_guard()) .def("get_fleet_send_batch_size", - &framework::Dataset::GetFleetSendBatchSize) - .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) - .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) + &framework::Dataset::GetFleetSendBatchSize, + py::call_guard()) + .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig, + py::call_guard()) + .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc, + py::call_guard()) .def("register_client2client_msg_handler", - &framework::Dataset::RegisterClientToClientMsgHandler) - .def("load_into_memory", &framework::Dataset::LoadIntoMemory) - .def("release_memory", &framework::Dataset::ReleaseMemory) - .def("local_shuffle", &framework::Dataset::LocalShuffle) - .def("global_shuffle", &framework::Dataset::GlobalShuffle) - .def("get_memory_data_size", &framework::Dataset::GetMemoryDataSize) - .def("get_shuffle_data_size", &framework::Dataset::GetShuffleDataSize); + &framework::Dataset::RegisterClientToClientMsgHandler, + py::call_guard()) + .def("create_channel", &framework::Dataset::CreateChannel, + py::call_guard()) + .def("create_readers", &framework::Dataset::CreateReaders, + py::call_guard()) + .def("destroy_readers", &framework::Dataset::DestroyReaders, + py::call_guard()) + .def("load_into_memory", &framework::Dataset::LoadIntoMemory, + py::call_guard()) + .def("preload_into_memory", &framework::Dataset::PreLoadIntoMemory, + py::call_guard()) + .def("wait_preload_done", &framework::Dataset::WaitPreLoadDone, + py::call_guard()) + .def("release_memory", &framework::Dataset::ReleaseMemory, + py::call_guard()) + .def("local_shuffle", &framework::Dataset::LocalShuffle, + py::call_guard()) + .def("global_shuffle", &framework::Dataset::GlobalShuffle, + py::call_guard()) + .def("get_memory_data_size", &framework::Dataset::GetMemoryDataSize, + py::call_guard()) + .def("get_shuffle_data_size", &framework::Dataset::GetShuffleDataSize, + py::call_guard()) + .def("set_queue_num", &framework::Dataset::SetChannelNum, + py::call_guard()); } } // end namespace pybind diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index b3d58a589bd..8c886ca23b6 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -71,6 +71,7 @@ class DatasetBase(object): self.proto_desc.pipe_command = "cat" self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 0 + self.filelist = [] def set_pipe_command(self, pipe_command): """ @@ -139,6 +140,7 @@ class DatasetBase(object): filelist(list): file list """ self.dataset.set_filelist(filelist) + self.filelist = filelist def set_use_var(self, var_list): """ @@ -193,7 +195,14 @@ class DatasetBase(object): Set data_feed_desc before load or shuffle, user no need to call this function. """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + self.dataset.set_thread_num(self.thread_num) self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_readers() + + def _finish_to_run(self): + self.dataset.destroy_readers() def desc(self): """ @@ -226,6 +235,57 @@ class InMemoryDataset(DatasetBase): """ Init. """ super(InMemoryDataset, self).__init__() self.proto_desc.name = "MultiSlotInMemoryDataFeed" + self.fleet_send_batch_size = 80000 + self.queue_num = None + + def _prepare_to_run(self): + """ + Set data_feed_desc before load or shuffle, + user no need to call this function. + """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + self.dataset.set_thread_num(self.thread_num) + if self.queue_num is None: + self.queue_num = self.thread_num + self.dataset.set_queue_num(self.queue_num) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_channel() + self.dataset.create_readers() + + def set_queue_num(self, queue_num): + """ + Set Dataset output queue num, training threads get data from queues + + Args: + set_queue_num(int): dataset output queue num + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_queue_num(12) + + """ + self.queue_num = queue_num + + def set_fleet_send_batch_size(self, fleet_send_batch_size): + """ + Set fleet send batch size, default is 80000 + + Args: + fleet_send_batch_size(int): fleet send batch size + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fleet_send_batch_size(800) + + """ + self.fleet_send_batch_size = fleet_send_batch_size def load_into_memory(self): """ @@ -243,6 +303,39 @@ class InMemoryDataset(DatasetBase): self._prepare_to_run() self.dataset.load_into_memory() + def preload_into_memory(self): + """ + Load data into memory in async mode + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self._prepare_to_run() + self.dataset.preload_into_memory() + + def wait_preload_done(self): + """ + Wait preload_into_memory done + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self.dataset.wait_preload_done() + def local_shuffle(self): """ Local shuffle @@ -282,13 +375,12 @@ class InMemoryDataset(DatasetBase): """ trainer_num = 1 - fleet_send_batch_size = 80000 if fleet is not None: fleet._role_maker._barrier_worker() trainer_num = fleet.worker_num() self.dataset.register_client2client_msg_handler() self.dataset.set_trainer_num(trainer_num) - self.dataset.set_fleet_send_batch_size(fleet_send_batch_size) + self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size) if fleet is not None: fleet._role_maker._barrier_worker() self.dataset.global_shuffle() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1bca5bb3a14..a44570abbbe 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -889,6 +889,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is needed and should be initialized") + dataset._prepare_to_run() scope, trainer = self._prepare_trainer( program=program, dataset=dataset, @@ -900,11 +901,11 @@ class Executor(object): print_period=print_period) trainer._set_infer(True) trainer._gen_trainer_desc() - dataset._prepare_to_run() self._dump_debug_info(program=program, trainer=trainer) self._default_executor.run_from_dataset(program.desc, scope, dataset.dataset, trainer._desc()) + dataset._finish_to_run() return None def train_from_dataset(self, @@ -969,6 +970,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is need and should be initialized") + dataset._prepare_to_run() scope, trainer = self._prepare_trainer( program=program, dataset=dataset, @@ -979,9 +981,9 @@ class Executor(object): fetch_info=fetch_info, print_period=print_period) trainer._gen_trainer_desc() - dataset._prepare_to_run() self._dump_debug_info(program=program, trainer=trainer) self._default_executor.run_from_dataset(program.desc, scope, dataset.dataset, trainer._desc()) + dataset._finish_to_run() return None -- GitLab