diff --git a/paddle/fluid/framework/archive.h b/paddle/fluid/framework/archive.h new file mode 100644 index 0000000000000000000000000000000000000000..100eb9518f71e76134e1baf4da9d1c569880a2db --- /dev/null +++ b/paddle/fluid/framework/archive.h @@ -0,0 +1,609 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "paddle/fluid/framework/expect.h" + +namespace paddle { +namespace framework { + +// not a virtual class +class ArchiveBase { + protected: + ArchiveBase() {} + + // Archive is not copyable. But to allow move capture by function objects, + // check it at runtime rather than at compile time. + ArchiveBase(const ArchiveBase&) { LOG(FATAL) << "Not supported"; } + + ArchiveBase(ArchiveBase&& other) + : buffer_(other.buffer_), + cursor_(other.cursor_), + finish_(other.finish_), + limit_(other.limit_), + deleter_(std::move(other.deleter_)) { + other.buffer_ = NULL; + other.cursor_ = NULL; + other.finish_ = NULL; + other.limit_ = NULL; + other.deleter_ = nullptr; + } + + ~ArchiveBase() { FreeBuffer(); } + + public: + ArchiveBase& operator=(const ArchiveBase&) { + LOG(FATAL) << "Not supported"; + return *this; + } + + ArchiveBase& operator=(ArchiveBase&& other) { + if (this != &other) { + FreeBuffer(); + buffer_ = other.buffer_; + cursor_ = other.cursor_; + finish_ = other.finish_; + limit_ = other.limit_; + deleter_ = std::move(other.deleter_); + other.buffer_ = NULL; + other.cursor_ = NULL; + other.finish_ = NULL; + other.limit_ = NULL; + other.deleter_ = nullptr; + } + return *this; + } + + char* Buffer() { return buffer_; } + + void SetReadBuffer(char* buffer, size_t length, + std::function&& deleter) { + SetBuffer(buffer, length, length, std::move(deleter)); + } + + void SetWriteBuffer(char* buffer, size_t capacity, + std::function&& deleter) { + SetBuffer(buffer, 0, capacity, std::move(deleter)); + } + + void SetBuffer(char* buffer, size_t length, size_t capacity, + std::function&& deleter) { + CHECK(length <= capacity); + FreeBuffer(); + buffer_ = buffer; + cursor_ = buffer_; + finish_ = buffer + length; + limit_ = buffer + capacity; + deleter_ = std::move(deleter); + } + + char* Cursor() { return cursor_; } + + void SetCursor(char* cursor) { + CHECK(cursor >= buffer_ && cursor <= finish_); + cursor_ = cursor; + } + + void AdvanceCursor(size_t offset) { + CHECK(offset <= size_t(finish_ - cursor_)); + cursor_ += offset; + } + + char* Finish() { return finish_; } + + void SetFinish(char* finish) { + CHECK(finish >= cursor_ && finish <= limit_); + finish_ = finish; + } + + void AdvanceFinish(size_t offset) { + CHECK(offset <= size_t(limit_ - finish_)); + finish_ += offset; + } + + char* Limit() { return limit_; } + + size_t Position() { return cursor_ - buffer_; } + + size_t Length() { return finish_ - buffer_; } + + size_t Capacity() { return limit_ - buffer_; } + + bool Empty() { return finish_ == buffer_; } + + void Reset() { + FreeBuffer(); + buffer_ = NULL; + cursor_ = NULL; + finish_ = NULL; + limit_ = NULL; + } + + void Clear() { + cursor_ = buffer_; + finish_ = buffer_; + } + + char* Release() { + char* buf = buffer_; + buffer_ = NULL; + cursor_ = NULL; + finish_ = NULL; + deleter_ = nullptr; + return buf; + } + + void Resize(size_t newsize) { +#ifdef _LINUX + if (unlikely(newsize > Capacity())) { +#else + if (newsize > Capacity()) { +#endif + Reserve(std::max(Capacity() * 2, newsize)); + } + finish_ = buffer_ + newsize; + cursor_ = std::min(cursor_, finish_); + } + + void Reserve(size_t newcap) { + if (newcap > Capacity()) { + char* newbuf = NULL; + newbuf = new char[newcap]; + CHECK(newbuf != nullptr) << "Reserve failed, out of memory"; + if (Length() > 0) { + memcpy(newbuf, buffer_, Length()); + } + cursor_ = newbuf + (cursor_ - buffer_); + finish_ = newbuf + (finish_ - buffer_); + limit_ = newbuf + newcap; + FreeBuffer(); + buffer_ = newbuf; + deleter_ = std::default_delete(); + } + } + + void PrepareRead(size_t size) { +#ifdef _LINUX + if (unlikely(!(size <= size_t(finish_ - cursor_)))) { +#else + if (!(size <= size_t(finish_ - cursor_))) { +#endif + CHECK(size <= size_t(finish_ - cursor_)); + } + } + + void PrepareWrite(size_t size) { +#ifdef _LINUX + if (unlikely(size > size_t(limit_ - finish_))) { +#else + if (size > size_t(limit_ - finish_)) { +#endif + Reserve(std::max(Capacity() * 2, Length() + size)); + } + } + + void Read(void* data, size_t size) { + if (size > 0) { + PrepareRead(size); + memcpy(data, cursor_, size); + AdvanceCursor(size); + } + } + + void ReadBack(void* data, size_t size) { + if (size > 0) { + CHECK(size <= size_t(finish_ - cursor_)); + memcpy(data, finish_ - size, size); + finish_ -= size; + } + } + + void Write(const void* data, size_t size) { + if (size > 0) { + PrepareWrite(size); + memcpy(finish_, data, size); + AdvanceFinish(size); + } + } + + template + void GetRaw(T& x) { // NOLINT + PrepareRead(sizeof(T)); + memcpy(&x, cursor_, sizeof(T)); + AdvanceCursor(sizeof(T)); + } + + template + T GetRaw() { + T x; + GetRaw(x); + return x; + } + + template + void PutRaw(const T& x) { + PrepareWrite(sizeof(T)); + memcpy(finish_, &x, sizeof(T)); + AdvanceFinish(sizeof(T)); + } + + protected: + char* buffer_ = NULL; + char* cursor_ = NULL; + char* finish_ = NULL; + char* limit_ = NULL; + std::function deleter_ = nullptr; + + void FreeBuffer() { + if (deleter_) { + deleter_(buffer_); + } + deleter_ = nullptr; + } +}; // NOLINT + +template +class Archive {}; + +class BinaryArchiveType {}; + +typedef Archive BinaryArchive; + +template <> +class Archive : public ArchiveBase { + public: +#define ARCHIVE_REPEAT(T) \ + BinaryArchive& operator>>(T& x) { \ + GetRaw(x); \ + return *this; \ + } \ + BinaryArchive& operator<<(const T& x) { \ + PutRaw(x); \ + return *this; \ + } + + ARCHIVE_REPEAT(int16_t) + ARCHIVE_REPEAT(uint16_t) + ARCHIVE_REPEAT(int32_t) + ARCHIVE_REPEAT(uint32_t) + ARCHIVE_REPEAT(int64_t) + ARCHIVE_REPEAT(uint64_t) + ARCHIVE_REPEAT(float) + ARCHIVE_REPEAT(double) + ARCHIVE_REPEAT(signed char) + ARCHIVE_REPEAT(unsigned char) + ARCHIVE_REPEAT(bool) + +#undef ARCHIVE_REPEAT + + template + T Get() { + T x; + *this >> x; + return x; + } +}; + +template +Archive& operator<<(Archive& ar, const T (&p)[N]) { + for (size_t i = 0; i < N; i++) { + ar << p[i]; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, T (&p)[N]) { + for (size_t i = 0; i < N; i++) { + ar >> p[i]; + } + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::vector& p) { +#ifdef _LINUX + ar << (size_t)p.size(); +#else + ar << (uint64_t)p.size(); +#endif + for (const auto& x : p) { + ar << x; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, std::vector& p) { +#ifdef _LINUX + p.resize(ar.template Get()); +#else + p.resize(ar.template Get()); +#endif + for (auto& x : p) { + ar >> x; + } + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::valarray& p) { +#ifdef _LINUX + ar << (size_t)p.size(); +#else + ar << (uint64_t)p.size(); +#endif + for (const auto& x : p) { + ar << x; + } + return ar; +} + +template +Archive& operator>>(Archive& ar, std::valarray& p) { +#ifdef _LINUX + p.resize(ar.template Get()); +#else + p.resize(ar.template Get()); +#endif + for (auto& x : p) { + ar >> x; + } + return ar; +} + +inline BinaryArchive& operator<<(BinaryArchive& ar, const std::string& s) { +#ifdef _LINUX + ar << (size_t)s.length(); +#else + ar << (uint64_t)s.length(); +#endif + ar.Write(&s[0], s.length()); + return ar; +} + +inline BinaryArchive& operator>>(BinaryArchive& ar, std::string& s) { +#ifdef _LINUX + size_t len = ar.template Get(); +#else + size_t len = ar.template Get(); +#endif + ar.PrepareRead(len); + s.assign(ar.Cursor(), len); + ar.AdvanceCursor(len); + return ar; +} + +template +Archive& operator<<(Archive& ar, const std::pair& x) { + return ar << x.first << x.second; +} + +template +Archive& operator>>(Archive& ar, std::pair& x) { // NOLINT + return ar >> x.first >> x.second; +} + +#ifdef _LINUX +template +Archive& SerializeTuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return ar; +} +#else +template +Archive& SerializeTuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return ar; +} +#endif + +#ifdef _LINUX +template +Archive& serialize_tuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return SerializeTuple(ar, x, std::integral_constant()) + << std::get(x); +} +#else +template +Archive& serialize_tuple(Archive& ar, // NOLINT + const std::tuple& x, // NOLINT + std::integral_constant n) { // NOLINT + return SerializeTuple(ar, x, std::integral_constant()) + << std::get(x); +} +#endif + +#ifdef _LINUX +template +Archive& operator<<(Archive& ar, const std::tuple& x) { + const size_t size = std::tuple_size>::value; + return SerializeTuple(ar, x, std::integral_constant()); +} +#else +template +Archive& operator<<(Archive& ar, const std::tuple& x) { + const uint64_t size = std::tuple_size>::value; + return SerializeTuple(ar, x, std::integral_constant()); +} +#endif + +#ifdef _LINUX +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return ar; +} +#else +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return ar; +} +#endif + +#ifdef _LINUX +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return DeserializeTuple(ar, x, std::integral_constant()) >> + std::get(x); +} +#else +template +Archive& DeserializeTuple(Archive& ar, std::tuple& x, // NOLINT + std::integral_constant n) { + return DeserializeTuple(ar, x, std::integral_constant()) >> + std::get(x); +} +#endif + +#ifdef _LINUX +template +Archive& operator>>(Archive& ar, std::tuple& x) { + const size_t size = std::tuple_size>::value; + return DeserializeTuple(ar, x, std::integral_constant()); +} +#else +template +Archive& operator>>(Archive& ar, std::tuple& x) { + const uint64_t size = std::tuple_size>::value; + return DeserializeTuple(ar, x, std::integral_constant()); +} +#endif + +#ifdef _LINUX +#define ARCHIVE_REPEAT(MAP_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, \ + const MAP_TYPE& p) { \ + ar << (size_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, MAP_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get>()); \ + } \ + return ar; \ + } +#else +#define ARCHIVE_REPEAT(MAP_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, \ + const MAP_TYPE& p) { \ + ar << (uint64_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, MAP_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get>()); \ + } \ + return ar; \ + } +#endif + +ARCHIVE_REPEAT(std::map, ) +ARCHIVE_REPEAT(std::multimap, ) +ARCHIVE_REPEAT(std::unordered_map, p.reserve(size)) +ARCHIVE_REPEAT(std::unordered_multimap, p.reserve(size)) + +#undef ARCHIVE_REPEAT + +#ifdef _LINUX +#define ARCHIVE_REPEAT(SET_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, const SET_TYPE& p) { \ + ar << (size_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, SET_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get()); \ + } \ + return ar; \ + } +#else +#define ARCHIVE_REPEAT(SET_TYPE, RESERVE_STATEMENT) \ + template \ + Archive& operator<<(Archive& ar, const SET_TYPE& p) { \ + ar << (uint64_t)p.size(); \ + for (auto it = p.begin(); it != p.end(); ++it) { \ + ar << *it; \ + } \ + return ar; \ + } \ + template \ + Archive& operator>>(Archive& ar, SET_TYPE& p) { \ + size_t size = ar.template get(); \ + p.clear(); \ + RESERVE_STATEMENT; \ + for (size_t i = 0; i < size; i++) { \ + p.insert(ar.template get()); \ + } \ + return ar; \ + } +#endif + +ARCHIVE_REPEAT(std::set, ) +ARCHIVE_REPEAT(std::multiset, ) +ARCHIVE_REPEAT(std::unordered_set, p.reserve(size)) +ARCHIVE_REPEAT(std::unordered_multiset, p.reserve(size)) + +#undef ARCHIVE_REPEAT + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h new file mode 100644 index 0000000000000000000000000000000000000000..644f60dbebf61203c8d811aa8722e0f239018b5d --- /dev/null +++ b/paddle/fluid/framework/channel.h @@ -0,0 +1,460 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#include +#include +#include // NOLINT +#include +#include +#include +#include // NOLINT +#include +#include +#include "paddle/fluid/framework/expect.h" + +namespace paddle { +namespace framework { + +template +class ChannelObject { + public: + ChannelObject() {} + + // capacity can be zero + explicit ChannelObject(size_t capacity) { + capacity_ = std::min(MaxCapacity(), capacity); + } + + void Clear() { + std::unique_lock lock(mutex_); + data_.clear(); + data_.shrink_to_fit(); + } + + size_t Capacity() { + return capacity_; // atomic + } + + void SetCapacity(size_t x) { // capacity can be zero + std::lock_guard lock(mutex_); + capacity_ = std::min(MaxCapacity(), x); + Notify(); + } + + size_t BlockSize() { + return block_size_; // atomic + } + + void SetBlockSize(size_t x) { + CHECK(x >= 1) << "block size must be >= 1"; + std::lock_guard lock(mutex_); + block_size_ = x; + } + + template + void InheritFrom(const std::shared_ptr>& other) { + std::lock_guard lock(mutex_); + capacity_ = other->Capacity(); + block_size_ = other->BlockSize(); + } + + bool Closed() { + return closed_; // atomic + } + + // open channel, then data can be write() to channel + void Open() { + std::lock_guard lock(mutex_); + closed_ = false; + Notify(); + } + + // close channel, then no more data can be write() to channel + void Close() { + std::lock_guard lock(mutex_); + closed_ = true; + Notify(); + } + + size_t Size() { + std::lock_guard lock(mutex_); + return data_.size(); + } + + bool Empty() { + std::lock_guard lock(mutex_); + return EmptyUnlocked(); + } + + // blocking operation + bool Get(T& val) { return Read(1, &val) != 0; } // NOLINT + + // blocking operation + // returns 0 if the channel is closed and empty + size_t Read(size_t n, T* p) { + if (n == 0) { + return 0; + } + + std::unique_lock lock(mutex_); + size_t finished = Read(n, p, lock); + Notify(); + return finished; + } + + // blocking operation + bool Put(T&& val) { return WriteMove(1, &val) != 0; } + + // blocking operation + bool Put(const T& val) { return Write(1, &val) != 0; } + + // blocking operation + // returns value less than n if the channel is closed + size_t Write(size_t n, const T* p) { + if (n == 0) { + return 0; + } + std::unique_lock lock(mutex_); + size_t finished = Write(n, p, lock); + Notify(); + return finished; + } + + // WriteMove() will clear original contents of input array + size_t WriteMove(size_t n, T* p) { + if (n == 0) { + return 0; + } + std::unique_lock lock(mutex_); + size_t finished = WriteMove(n, p, lock); + Notify(); + return finished; + } + + // read data of block size from channel to vector + size_t Read(std::vector& p) { // NOLINT + p.resize(block_size_); + size_t finished = Read(p.size(), &p[0]); + p.resize(finished); + return finished; + } + + size_t ReadAll(std::vector& p) { // NOLINT + p.clear(); + size_t finished = 0; + size_t n = 0; + do { + // _block_size may change anytime + n = block_size_; + p.resize(finished + n); + n = Read(n, &p[finished]); + finished += n; + } while (n != 0); + p.resize(finished); + return finished; + } + + // write data from vector to channel + size_t Write(const std::vector& p) { return Write(p.size(), &p[0]); } + + // write data from vector to channel + size_t Write(std::vector&& p) { return WriteMove(p.size(), &p[0]); } + + private: + size_t capacity_ = MaxCapacity(); + size_t block_size_ = 1024; + bool closed_ = false; + std::mutex mutex_; + // use deque to store data + std::deque data_; + size_t reading_count_ = 0; + int empty_waiters_ = 0; + int full_waiters_ = 0; + std::condition_variable empty_cond_; + std::condition_variable full_cond_; + + static constexpr size_t MaxCapacity() { + return std::numeric_limits::max() / 2; + } + + void Notify() { + if (empty_waiters_ != 0 && (!EmptyUnlocked() || closed_)) { + empty_cond_.notify_one(); + } + if (full_waiters_ != 0 && (!FullUnlocked() || closed_)) { + full_cond_.notify_one(); + } + } + + bool EmptyUnlocked() { return data_.empty(); } + + bool FullUnlocked() { return data_.size() >= capacity_ + reading_count_; } + + bool WaitForRead(std::unique_lock& lock) { // NOLINT +#ifdef _LINUX + while (unlikely(EmptyUnlocked() && !closed_)) { +#else + while (EmptyUnlocked() && !closed_) { +#endif + if (full_waiters_ != 0) { + full_cond_.notify_one(); + } + empty_waiters_++; + empty_cond_.wait(lock); + empty_waiters_--; + } + return !EmptyUnlocked(); + } + + bool WaitForWrite(std::unique_lock& lock) { // NOLINT +#ifdef _LINUX + while (unlikely(FullUnlocked() && !closed_)) { +#else + while (FullUnlocked() && !closed_) { +#endif + if (empty_waiters_ != 0) { + empty_cond_.notify_one(); + } + full_waiters_++; + full_cond_.wait(lock); + full_waiters_--; + } + return !closed_; + } + + size_t Read(size_t n, T* p, std::unique_lock& lock) { // NOLINT + size_t finished = 0; + CHECK(n <= MaxCapacity() - reading_count_); + reading_count_ += n; + while (finished < n && WaitForRead(lock)) { + size_t m = std::min(n - finished, data_.size()); + for (size_t i = 0; i < m; i++) { + p[finished++] = std::move(data_.front()); + data_.pop_front(); + } + reading_count_ -= m; + } + reading_count_ -= n - finished; + return finished; + } + + size_t Write(size_t n, + const T* p, // NOLINT + std::unique_lock& lock) { // NOLINT + size_t finished = 0; + while (finished < n && WaitForWrite(lock)) { + size_t m = + std::min(n - finished, capacity_ + reading_count_ - data_.size()); + for (size_t i = 0; i < m; i++) { + data_.push_back(p[finished++]); + } + } + return finished; + } + + size_t WriteMove(size_t n, + T* p, // NOLINT + std::unique_lock& lock) { // NOLINT + size_t finished = 0; + while (finished < n && WaitForWrite(lock)) { + size_t m = + std::min(n - finished, capacity_ + reading_count_ - data_.size()); + for (size_t i = 0; i < m; i++) { + data_.push_back(std::move(p[finished++])); + } + } + return finished; + } +}; // NOLINT + +template +using Channel = std::shared_ptr>; + +template +Channel MakeChannel(size_t capacity = std::numeric_limits::max()) { + return std::make_shared>(capacity); +} + +template +Channel MakeChannel(const Channel& other) { + CHECK(other != nullptr) << "channel can not be NULL"; + Channel chan = std::make_shared>(); + chan->InheritFrom(other); + return chan; +} + +// NOTE: ChannelReader is a wrapper for quick read channel with a buffer. It +// will read a block data from channel, but user can get data one by one. So it +// is important to notice that user must call operator>> until false, or call +// get_buffer_remain until false to make sure the buffered data all readed. +template +class ChannelReader { + public: + explicit ChannelReader(ChannelObject* channel = nullptr) { + Reset(channel); + } + + ~ChannelReader() { CHECK(cursor_ == 0) << "Forgot to read buffer data"; } + + ChannelObject* channel() { return channel_; } + + void Reset(ChannelObject* channel) { + CHECK(channel != nullptr) << "Channel can not be nullptr"; + channel_ = channel; + cursor_ = 0; + failed_ = !channel; + } + + // whether there were read failed + operator bool() { return !failed_; } + + ChannelReader& operator>>(T& val) { + if (failed_) { + return *this; + } + if (cursor_ >= buffer_.size()) { + cursor_ = 0; + if (channel_->read(buffer_) == 0) { + failed_ = true; + return *this; + } + } + val = std::move(buffer_[cursor_++]); + return *this; + } + + bool GetBufferRemain(T& val) { // NOLINT + if (cursor_ >= buffer_.size()) { + cursor_ = 0; + return false; + } + val = std::move(buffer_[cursor_++]); + return true; + } + + private: + ChannelObject* channel_ = nullptr; + std::vector buffer_; + size_t cursor_ = 0; + bool failed_ = true; +}; // NOLINT + +template +class ChannelWriter { + public: + explicit ChannelWriter(ChannelObject* channel = nullptr) { + Reset(channel); + } + + ~ChannelWriter() { CHECK(buffer_.empty()) << "Forgot to flush"; } + + ChannelObject* channel() { return channel_; } + + void Reset(ChannelObject* channel) { + CHECK(buffer_.empty()) << "Forgot to flush"; + CHECK(channel != nullptr) << "Channel can not be nullptr"; + channel_ = channel; + buffer_.clear(); + failed_ = !channel; + } + + // whether there were write failed + operator bool() { return !failed_; } + + ChannelWriter& operator<<(T&& val) { + if (failed_) { + return *this; + } + buffer_.push_back(std::move(val)); + if (buffer_.size() >= channel_->BlockSize()) { + Flush(); + } + return *this; + } + + ChannelWriter& operator<<(const T& val) { + if (failed_) { + return *this; + } + buffer_.push_back(val); + if (buffer_.size() >= channel_->BlockSize()) { + Flush(); + } + return *this; + } + + void Flush() { + if (failed_ || buffer_.empty()) { + buffer_.clear(); + return; + } + failed_ |= + channel_->WriteMove(buffer_.size(), &buffer_[0]) != buffer_.size(); + buffer_.clear(); + } + + private: + ChannelObject* channel_ = nullptr; + std::vector buffer_; + bool failed_ = true; +}; // NOLINT + +// only used for range-for loop +// for (auto& x : chan) {...} +template +struct ChannelIterator { + std::shared_ptr> reader_; + T data_; + + void operator++() { + CHECK(reader_ != nullptr) << "reader can not be NULL"; + if (!(*reader_ >> data_)) { + reader_ = nullptr; + } + } + + T& operator*() { return data_; } + + friend bool operator==(const ChannelIterator& a, + const ChannelIterator& b) { + return a.reader_ == b.reader_; + } + + friend bool operator!=(const ChannelIterator& a, + const ChannelIterator& b) { + return a.reader_ != b.reader_; + } +}; // NOLINT + +template +ChannelIterator begin(ChannelObject* chan) { + ChannelIterator it{std::make_shared>(chan), T()}; + ++it; + return it; +} + +template +ChannelIterator end(ChannelObject* chan) { + return {nullptr, T()}; +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index e89f3f1a4e06f1ea94a2050f03ebc6c58c591625..697a38398314fb6fccce13a9b1aed2b9e7468ec1 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -160,81 +160,82 @@ template class PrivateQueueDataFeed>; template InMemoryDataFeed::InMemoryDataFeed() { - cur_channel_ = 0; - shuffled_ins_ = std::make_shared>(); - shuffled_ins_out_ = std::make_shared>(); - fleet_send_batch_size_ = 80000; // hard code here - memory_data_ = nullptr; - mutex_for_update_memory_data_ = nullptr; this->file_idx_ = nullptr; this->mutex_for_pick_file_ = nullptr; - fleet_send_sleep_seconds_ = 2; + this->fp_ = nullptr; + this->thread_id_ = 0; + this->thread_num_ = 1; + this->input_channel_ = nullptr; + this->output_channel_ = nullptr; + this->consume_channel_ = nullptr; } template bool InMemoryDataFeed::Start() { #ifdef _LINUX - DataFeed::CheckSetFileList(); - if (shuffled_ins_->Size() == 0 && shuffled_ins_out_->Size() == 0) { - FillMemoryDataToChannel(); + this->CheckSetFileList(); + if (output_channel_->Size() == 0 && input_channel_->Size() != 0) { + std::vector data; + input_channel_->Read(data); + output_channel_->Write(std::move(data)); } #endif - DataFeed::finish_start_ = true; + this->finish_start_ = true; return true; } template int InMemoryDataFeed::Next() { #ifdef _LINUX - DataFeed::CheckStart(); - std::shared_ptr> in_channel = nullptr; - std::shared_ptr> out_channel = nullptr; - if (cur_channel_ == 0) { - in_channel = shuffled_ins_; - out_channel = shuffled_ins_out_; - } else { - in_channel = shuffled_ins_out_; - out_channel = shuffled_ins_; - } - CHECK(in_channel != nullptr); - CHECK(out_channel != nullptr); - VLOG(3) << "in_channel size=" << in_channel->Size() - << ", out_channel size=" << out_channel->Size() + this->CheckStart(); + CHECK(output_channel_ != nullptr); + CHECK(consume_channel_ != nullptr); + VLOG(3) << "output_channel_ size=" << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() << ", thread_id=" << thread_id_; int index = 0; T instance; - T ins_vec; - while (index < DataFeed::default_batch_size_) { - if (in_channel->Size() == 0) { + std::vector ins_vec; + ins_vec.reserve(this->default_batch_size_); + while (index < this->default_batch_size_) { + if (output_channel_->Size() == 0) { break; } - in_channel->Pop(&instance); - - AddInstanceToInsVec(&ins_vec, instance, index++); - out_channel->Push(std::move(instance)); + output_channel_->Get(instance); + ins_vec.push_back(instance); + ++index; + consume_channel_->Put(std::move(instance)); } - DataFeed::batch_size_ = index; - VLOG(3) << "batch_size_=" << DataFeed::batch_size_ + this->batch_size_ = index; + VLOG(3) << "batch_size_=" << this->batch_size_ << ", thread_id=" << thread_id_; - if (DataFeed::batch_size_ != 0) { + if (this->batch_size_ != 0) { PutToFeedVec(ins_vec); } else { - cur_channel_ = 1 - cur_channel_; + VLOG(3) << "finish reading, output_channel_ size=" + << output_channel_->Size() + << ", consume_channel_ size=" << consume_channel_->Size() + << ", thread_id=" << thread_id_; } - return DataFeed::batch_size_; + return this->batch_size_; #else return 0; #endif } template -void InMemoryDataFeed::SetMemoryData(void* memory_data) { - memory_data_ = static_cast*>(memory_data); +void InMemoryDataFeed::SetInputChannel(void* channel) { + input_channel_ = static_cast*>(channel); +} + +template +void InMemoryDataFeed::SetOutputChannel(void* channel) { + output_channel_ = static_cast*>(channel); } template -void InMemoryDataFeed::SetMemoryDataMutex(std::mutex* mutex) { - mutex_for_update_memory_data_ = mutex; +void InMemoryDataFeed::SetConsumeChannel(void* channel) { + consume_channel_ = static_cast*>(channel); } template @@ -247,213 +248,38 @@ void InMemoryDataFeed::SetThreadNum(int thread_num) { thread_num_ = thread_num; } -template -void InMemoryDataFeed::SetTrainerNum(int trainer_num) { - trainer_num_ = trainer_num; -} - -template -void InMemoryDataFeed::SetFleetSendBatchSize(int64_t size) { - fleet_send_batch_size_ = size; -} - -template -void InMemoryDataFeed::PutInsToChannel(const std::string& ins_str) { -#ifdef _LINUX - std::vector ins; - DeserializeIns(&ins, ins_str); - shuffled_ins_->Extend(std::move(ins)); - VLOG(3) << "PutInsToChannel put ins num=" << ins.size() - << " to channel, channel size=" << shuffled_ins_->Size() - << " thread_id=" << thread_id_; -#endif -} - -template -void InMemoryDataFeed::FillMemoryDataToChannel() { -#ifdef _LINUX - VLOG(3) << "FillMemoryDataToChannel, thread_id=" << thread_id_; - auto interval = GetMemoryDataInterval(); - VLOG(3) << "memory data size=" << memory_data_->size() - << ", fill data from [" << interval.first << ", " << interval.second - << "), thread_id=" << thread_id_; - for (int64_t i = interval.first; i < interval.second; ++i) { - T& t = (*memory_data_)[i]; - shuffled_ins_->Push(std::move(t)); - } -#endif -} - -template -void InMemoryDataFeed::FillChannelToMemoryData() { -#ifdef _LINUX - VLOG(3) << "FillChannelToMemoryData, thread_id=" << thread_id_; - std::vector local_vec; - std::shared_ptr> channel = nullptr; - std::shared_ptr> pre_channel = nullptr; - if (cur_channel_ == 0) { - channel = shuffled_ins_; - pre_channel = shuffled_ins_out_; - } else { - channel = shuffled_ins_out_; - pre_channel = shuffled_ins_; - } - CHECK(channel != nullptr); - CHECK(pre_channel != nullptr); - CHECK_EQ(pre_channel->Size(), 0); - local_vec.resize(channel->Size()); - for (int64_t i = 0; i < local_vec.size(); ++i) { - channel->Pop(&local_vec[i]); - } - VLOG(3) << "local_vec size=" << local_vec.size() - << ", thread_id=" << thread_id_; - { - std::lock_guard g(*mutex_for_update_memory_data_); - VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size() - << ", thread_id=" << thread_id_; - memory_data_->insert(memory_data_->end(), local_vec.begin(), - local_vec.end()); - VLOG(3) << "after insert memory_data_ size=" << memory_data_->size() - << ", thread_id=" << thread_id_; - } - std::vector().swap(local_vec); -#endif -} - template void InMemoryDataFeed::LoadIntoMemory() { #ifdef _LINUX VLOG(3) << "LoadIntoMemory() begin, thread_id=" << thread_id_; - std::vector local_vec; std::string filename; - while (DataFeed::PickOneFile(&filename)) { + while (this->PickOneFile(&filename)) { VLOG(3) << "PickOneFile, filename=" << filename << ", thread_id=" << thread_id_; int err_no = 0; - PrivateQueueDataFeed::fp_ = - fs_open_read(filename, &err_no, PrivateQueueDataFeed::pipe_command_); - CHECK(PrivateQueueDataFeed::fp_ != nullptr); - __fsetlocking(&*PrivateQueueDataFeed::fp_, FSETLOCKING_BYCALLER); + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + paddle::framework::ChannelWriter writer(input_channel_); T instance; platform::Timer timeline; timeline.Start(); while (ParseOneInstanceFromPipe(&instance)) { - local_vec.push_back(instance); + writer << std::move(instance); + instance = T(); } + writer.Flush(); timeline.Pause(); VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; - { - std::lock_guard lock(*mutex_for_update_memory_data_); - timeline.Start(); - memory_data_->insert(memory_data_->end(), - std::make_move_iterator(local_vec.begin()), - std::make_move_iterator(local_vec.end())); - timeline.Pause(); - VLOG(3) << "LoadIntoMemory() memory_data insert, cost time=" - << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; - } - local_vec.clear(); } - std::vector().swap(local_vec); VLOG(3) << "LoadIntoMemory() end, thread_id=" << thread_id_; #endif } -template -void InMemoryDataFeed::LocalShuffle() { -#ifdef _LINUX - VLOG(3) << "LocalShuffle() begin, thread_id=" << thread_id_; - FillMemoryDataToChannel(); - VLOG(3) << "LocalShuffle() end, thread_id=" << thread_id_; -#endif -} - -template -void InMemoryDataFeed::GlobalShuffle() { -#ifdef _LINUX - VLOG(3) << "GlobalShuffle() begin, thread_id=" << thread_id_; - auto fleet_ptr = FleetWrapper::GetInstance(); - std::vector> send_vec(trainer_num_); - std::vector send_index(trainer_num_); - uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_ + 1; - for (auto& vec : send_vec) { - vec.reserve(reserve_len); - } - for (int i = 0; i < trainer_num_; ++i) { - send_index[i] = i; - } - std::vector> total_status; - auto interval = GetMemoryDataInterval(); - VLOG(3) << "global shuffle data from [" << interval.first << ", " - << interval.second << "), thread_id=" << thread_id_; - - for (int64_t i = interval.first; i < interval.second; - i += fleet_send_batch_size_) { - for (int64_t j = 0; j < fleet_send_batch_size_ && i + j < interval.second; - ++j) { - int64_t random_num = fleet_ptr->LocalRandomEngine()(); - int64_t node_id = random_num % trainer_num_; - send_vec[node_id].push_back(&((*memory_data_)[i + j])); - } - total_status.clear(); - std::shuffle(send_index.begin(), send_index.end(), - fleet_ptr->LocalRandomEngine()); - for (int index = 0; index < send_index.size(); ++index) { - int j = send_index[index]; - if (send_vec[j].size() == 0) { - continue; - } - std::string send_str; - SerializeIns(send_vec[j], &send_str); - auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str); - total_status.push_back(std::move(ret)); - send_vec[j].clear(); - } - for (auto& t : total_status) { - t.wait(); - } - sleep(fleet_send_sleep_seconds_); - } - VLOG(3) << "GlobalShuffle() end, thread_id=" << thread_id_; -#endif -} - -template -std::pair InMemoryDataFeed::GetMemoryDataInterval() { - int64_t start = 0; - int64_t end = 0; - int64_t size = memory_data_->size(); - for (int64_t i = 0; i <= static_cast(thread_id_); ++i) { - int64_t len = size / static_cast(thread_num_) + - (i < (size % static_cast(thread_num_))); - start = end; - end += len; - } - return std::make_pair(start, end); -} - -template -int64_t InMemoryDataFeed::GetChannelDataSize() { - if (cur_channel_ == 0) { - return shuffled_ins_->Size(); - } else { - return shuffled_ins_out_->Size(); - } -} - -template -void InMemoryDataFeed::ReleaseChannelData() { - if (cur_channel_ == 0) { - shuffled_ins_->Clear(); - } else { - shuffled_ins_out_->Clear(); - } -} - // explicit instantiation -template class InMemoryDataFeed>; +template class InMemoryDataFeed; void MultiSlotDataFeed::Init( const paddle::framework::DataFeedDesc& data_feed_desc) { @@ -807,7 +633,6 @@ void MultiSlotInMemoryDataFeed::Init( paddle::framework::MultiSlotDesc multi_slot_desc = data_feed_desc.multi_slot_desc(); SetBatchSize(data_feed_desc.batch_size()); - SetQueueSize(data_feed_desc.batch_size()); size_t all_slot_num = multi_slot_desc.slots_size(); all_slots_.resize(all_slot_num); all_slots_type_.resize(all_slot_num); @@ -848,17 +673,13 @@ void MultiSlotInMemoryDataFeed::Init( finish_init_ = true; } -bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( - std::vector* instance) { +bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { #ifdef _LINUX thread_local string::LineFileReader reader; if (!reader.getline(&*(fp_.get()))) { return false; } else { - int use_slots_num = use_slots_.size(); - instance->resize(use_slots_num); - const char* str = reader.get(); std::string line = std::string(str); // VLOG(3) << line; @@ -875,16 +696,27 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( "characters.\nplease check this error line: %s", str); if (idx != -1) { - (*instance)[idx].Init(all_slots_type_[i]); - if ((*instance)[idx].GetType()[0] == 'f') { // float + if (all_slots_type_[i][0] == 'f') { // float for (int j = 0; j < num; ++j) { float feasign = strtof(endptr, &endptr); - (*instance)[idx].AddValue(feasign); + // if float feasign is equal to zero, ignore it + if (fabs(feasign) < 1e-6) { + continue; + } + FeatureKey f; + f.float_feasign_ = feasign; + instance->float_feasigns_.push_back(FeatureItem(f, idx)); } - } else if ((*instance)[idx].GetType()[0] == 'u') { // uint64 + } else if (all_slots_type_[i][0] == 'u') { // uint64 for (int j = 0; j < num; ++j) { uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10); - (*instance)[idx].AddValue(feasign); + // if uint64 feasign is equal to zero, ignore it + if (feasign == 0) { + continue; + } + FeatureKey f; + f.uint64_feasign_ = feasign; + instance->uint64_feasigns_.push_back(FeatureItem(f, idx)); } } pos = endptr - str; @@ -897,6 +729,8 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( } } } + instance->float_feasigns_.shrink_to_fit(); + instance->uint64_feasigns_.shrink_to_fit(); return true; } #else @@ -904,13 +738,10 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( #endif } -bool MultiSlotInMemoryDataFeed::ParseOneInstance( - std::vector* instance) { +bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) { #ifdef _LINUX std::string line; if (getline(file_, line)) { - int use_slots_num = use_slots_.size(); - instance->resize(use_slots_num); VLOG(3) << line; // parse line const char* str = line.c_str(); @@ -928,16 +759,25 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( str); if (idx != -1) { - (*instance)[idx].Init(all_slots_type_[i]); - if ((*instance)[idx].GetType()[0] == 'f') { // float + if (all_slots_type_[i][0] == 'f') { // float for (int j = 0; j < num; ++j) { float feasign = strtof(endptr, &endptr); - (*instance)[idx].AddValue(feasign); + if (fabs(feasign) < 1e-6) { + continue; + } + FeatureKey f; + f.float_feasign_ = feasign; + instance->float_feasigns_.push_back(FeatureItem(f, idx)); } - } else if ((*instance)[idx].GetType()[0] == 'u') { // uint64 + } else if (all_slots_type_[i][0] == 'u') { // uint64 for (int j = 0; j < num; ++j) { uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10); - (*instance)[idx].AddValue(feasign); + if (feasign == 0) { + continue; + } + FeatureKey f; + f.uint64_feasign_ = feasign; + instance->uint64_feasigns_.push_back(FeatureItem(f, idx)); } } pos = endptr - str; @@ -947,6 +787,9 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( } } } + instance->float_feasigns_.shrink_to_fit(); + instance->uint64_feasigns_.shrink_to_fit(); + return true; } else { return false; } @@ -954,46 +797,64 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( return false; } -void MultiSlotInMemoryDataFeed::AddInstanceToInsVec( - std::vector* ins_vec, - const std::vector& instance, int index) { +void MultiSlotInMemoryDataFeed::PutToFeedVec( + const std::vector& ins_vec) { #ifdef _LINUX - if (index == 0) { - ins_vec->resize(instance.size()); - for (size_t i = 0; i < instance.size(); ++i) { - (*ins_vec)[i].Init(instance[i].GetType()); - (*ins_vec)[i].InitOffset(); + std::vector> batch_float_feasigns(use_slots_.size(), + std::vector()); + std::vector> batch_uint64_feasigns( + use_slots_.size(), std::vector()); + std::vector> offset(use_slots_.size(), + std::vector{0}); + std::vector visit(use_slots_.size(), false); + for (size_t i = 0; i < ins_vec.size(); ++i) { + auto& r = ins_vec[i]; + for (auto& item : r.float_feasigns_) { + batch_float_feasigns[item.slot()].push_back(item.sign().float_feasign_); + visit[item.slot()] = true; + } + for (auto& item : r.uint64_feasigns_) { + batch_uint64_feasigns[item.slot()].push_back(item.sign().uint64_feasign_); + visit[item.slot()] = true; + } + for (size_t j = 0; j < use_slots_.size(); ++j) { + const auto& type = all_slots_type_[j]; + if (visit[j]) { + visit[j] = false; + } else { + // fill slot value with default value 0 + if (type[0] == 'f') { // float + batch_float_feasigns[j].push_back(0.0); + } else if (type[0] == 'u') { // uint64 + batch_uint64_feasigns[j].push_back(0); + } + } + // get offset of this ins in this slot + if (type[0] == 'f') { // float + offset[j].push_back(batch_float_feasigns[j].size()); + } else if (type[0] == 'u') { // uint64 + offset[j].push_back(batch_uint64_feasigns[j].size()); + } } } - for (size_t i = 0; i < instance.size(); ++i) { - (*ins_vec)[i].AddIns(instance[i]); - } -#endif -} - -void MultiSlotInMemoryDataFeed::PutToFeedVec( - const std::vector& ins_vec) { -#ifdef _LINUX for (size_t i = 0; i < use_slots_.size(); ++i) { - const auto& type = ins_vec[i].GetType(); - const auto& offset = ins_vec[i].GetOffset(); - int total_instance = static_cast(offset.back()); - + int total_instance = offset[i].back(); + const auto& type = all_slots_type_[i]; if (type[0] == 'f') { // float - const auto& feasign = ins_vec[i].GetFloatData(); + float* feasign = batch_float_feasigns[i].data(); float* tensor_ptr = feed_vec_[i]->mutable_data( {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(float)); + memcpy(tensor_ptr, feasign, total_instance * sizeof(float)); } else if (type[0] == 'u') { // uint64 // no uint64_t type in paddlepaddle - const auto& feasign = ins_vec[i].GetUint64Data(); + uint64_t* feasign = batch_uint64_feasigns[i].data(); int64_t* tensor_ptr = feed_vec_[i]->mutable_data( {total_instance, 1}, platform::CPUPlace()); - memcpy(tensor_ptr, &feasign[0], total_instance * sizeof(int64_t)); + memcpy(tensor_ptr, feasign, total_instance * sizeof(int64_t)); } - - LoD data_lod{offset}; + auto& slot_offset = offset[i]; + LoD data_lod{slot_offset}; feed_vec_[i]->set_lod(data_lod); if (use_slots_is_dense_[i]) { if (inductive_shape_index_[i] != -1) { @@ -1006,19 +867,6 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( #endif } -// todo serialize ins in global shuffle -void MultiSlotInMemoryDataFeed::SerializeIns( - const std::vector*>& ins, std::string* str) { - auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->Serialize(ins, str); -} -// todo deserialize ins in global shuffle -void MultiSlotInMemoryDataFeed::DeserializeIns( - std::vector>* ins, const std::string& str) { - auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->Deserialize(ins, str); -} - #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) template void PrivateInstantDataFeed::PutToFeedVec() { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 7fea85601c4e3f884c0fc26b5d5197d3b09cdc96..93f14dd74f975dbad6483f894b8c17ff6e16a0d8 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -14,6 +14,11 @@ limitations under the License. */ #pragma once +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + #include #include // NOLINT #include @@ -24,7 +29,9 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/archive.h" #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -88,17 +95,15 @@ class DataFeed { virtual void AssignFeedVar(const Scope& scope); // This function will do nothing at default - virtual void SetMemoryData(void* memory_data) {} + virtual void SetInputChannel(void* channel) {} + // This function will do nothing at default + virtual void SetOutputChannel(void* channel) {} // This function will do nothing at default - virtual void SetMemoryDataMutex(std::mutex* mutex) {} + virtual void SetConsumeChannel(void* channel) {} // This function will do nothing at default virtual void SetThreadId(int thread_id) {} // This function will do nothing at default virtual void SetThreadNum(int thread_num) {} - // This function will do nothing at default - virtual void SetTrainerNum(int trainer_num) {} - // This function will do nothing at default - virtual void SetFleetSendBatchSize(int64_t size) {} virtual void SetFileListMutex(std::mutex* mutex) { mutex_for_pick_file_ = mutex; } @@ -106,21 +111,6 @@ class DataFeed { virtual void LoadIntoMemory() { PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); } - virtual void LocalShuffle() { - PADDLE_THROW("This function(LocalShuffle) is not implemented."); - } - virtual void GlobalShuffle() { - PADDLE_THROW("This function(GlobalShuffle) is not implemented."); - } - // This function will do nothing at default - virtual void FillMemoryDataToChannel() {} - // This function will do nothing at default - virtual void FillChannelToMemoryData() {} - // This function will do nothing at default - virtual void PutInsToChannel(const std::string& ins_str) {} - virtual int64_t GetChannelDataSize() { return 0; } - // This function will do nothing at default - virtual void ReleaseChannelData() {} protected: // The following three functions are used to check if it is executed in this @@ -212,54 +202,32 @@ class PrivateQueueDataFeed : public DataFeed { }; template -class InMemoryDataFeed : public PrivateQueueDataFeed { +class InMemoryDataFeed : public DataFeed { public: InMemoryDataFeed(); virtual ~InMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc) = 0; virtual bool Start(); virtual int Next(); - virtual void SetMemoryData(void* memory_data); - virtual void SetMemoryDataMutex(std::mutex* mutex); + virtual void SetInputChannel(void* channel); + virtual void SetOutputChannel(void* channel); + virtual void SetConsumeChannel(void* channel); virtual void SetThreadId(int thread_id); virtual void SetThreadNum(int thread_num); - virtual void SetTrainerNum(int trainer_num); - virtual void SetFleetSendBatchSize(int64_t size); - virtual void PutInsToChannel(const std::string& ins_str); - virtual void FillMemoryDataToChannel(); - virtual void FillChannelToMemoryData(); virtual void LoadIntoMemory(); - virtual void LocalShuffle(); - virtual void GlobalShuffle(); - virtual int64_t GetChannelDataSize(); - virtual void ReleaseChannelData(); protected: - virtual void AddInstanceToInsVec(T* vec_ins, const T& instance, - int index) = 0; virtual bool ParseOneInstance(T* instance) = 0; virtual bool ParseOneInstanceFromPipe(T* instance) = 0; - virtual void PutToFeedVec(const T& ins_vec) = 0; - virtual void SerializeIns(const std::vector& ins, std::string* str) = 0; - virtual void DeserializeIns(std::vector* ins, const std::string& str) = 0; - virtual std::pair GetMemoryDataInterval(); + virtual void PutToFeedVec(const std::vector& ins_vec) = 0; int thread_id_; int thread_num_; - int trainer_num_; - uint32_t rand_seed; - std::vector* memory_data_; - std::mutex* mutex_for_update_memory_data_; - // when read ins, we put ins from one channel to the other, - // and when finish reading, we set cur_channel = 1 - cur_channel, - // so if cur_channel=0, all data are in shuffled_ins_, else shuffled_ins_out_ - int cur_channel_; - std::shared_ptr> shuffled_ins_; - std::shared_ptr> shuffled_ins_out_; - int64_t fleet_send_batch_size_; - // sleep after send is to slow down sending data, but it's trick, - // should be removed later. - int64_t fleet_send_sleep_seconds_; + std::ifstream file_; + std::shared_ptr fp_; + paddle::framework::ChannelObject* input_channel_; + paddle::framework::ChannelObject* output_channel_; + paddle::framework::ChannelObject* consume_channel_; }; // This class define the data type of instance(ins_vec) in MultiSlotDataFeed @@ -381,6 +349,126 @@ class MultiSlotType { std::vector offset_; }; +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const MultiSlotType& ins) { + ar << ins.GetType(); +#ifdef _LINUX + ar << ins.GetOffset(); +#else + const auto& offset = ins.GetOffset(); + ar << (uint64_t)offset.size(); + for (const size_t& x : offset) { + ar << (const uint64_t)x; + } +#endif + ar << ins.GetFloatData(); + ar << ins.GetUint64Data(); + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + MultiSlotType& ins) { + ar >> ins.MutableType(); +#ifdef _LINUX + ar >> ins.MutableOffset(); +#else + auto& offset = ins.MutableOffset(); + offset.resize(ar.template Get()); + for (size_t& x : offset) { + uint64_t t; + ar >> t; + x = (size_t)t; + } +#endif + ar >> ins.MutableFloatData(); + ar >> ins.MutableUint64Data(); + return ar; +} + +union FeatureKey { + uint64_t uint64_feasign_; + float float_feasign_; +}; + +struct FeatureItem { + FeatureItem() {} + FeatureItem(FeatureKey sign, uint16_t slot) { + this->sign() = sign; + this->slot() = slot; + } + FeatureKey& sign() { return *(reinterpret_cast(sign_buffer())); } + const FeatureKey& sign() const { + const FeatureKey* ret = reinterpret_cast(sign_buffer()); + return *ret; + } + uint16_t& slot() { return slot_; } + const uint16_t& slot() const { return slot_; } + + private: + char* sign_buffer() const { return const_cast(sign_); } + char sign_[sizeof(FeatureKey)]; + uint16_t slot_; +}; + +// sizeof Record is much less than std::vector +struct Record { + std::vector uint64_feasigns_; + std::vector float_feasigns_; + std::string ins_id_; +}; + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const FeatureKey& fk) { + ar << fk.uint64_feasign_; + ar << fk.float_feasign_; + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + FeatureKey& fk) { + ar >> fk.uint64_feasign_; + ar >> fk.float_feasign_; + return ar; +} + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const FeatureItem& fi) { + ar << fi.sign(); + ar << fi.slot(); + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + FeatureItem& fi) { + ar >> fi.sign(); + ar >> fi.slot(); + return ar; +} + +template +paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, + const Record& r) { + ar << r.uint64_feasigns_; + ar << r.float_feasigns_; + ar << r.ins_id_; + return ar; +} + +template +paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, + Record& r) { + ar >> r.uint64_feasigns_; + ar >> r.float_feasigns_; + ar >> r.ins_id_; + return ar; +} + // This DataFeed is used to feed multi-slot type data. // The format of multi-slot type data: // [n feasign_0 feasign_1 ... feasign_n]* @@ -391,7 +479,6 @@ class MultiSlotDataFeed virtual ~MultiSlotDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); virtual bool CheckFile(const char* filename); - // virtual void ReadThread(); protected: virtual void ReadThread(); @@ -403,24 +490,16 @@ class MultiSlotDataFeed virtual void PutToFeedVec(const std::vector& ins_vec); }; -class MultiSlotInMemoryDataFeed - : public InMemoryDataFeed> { +class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { public: MultiSlotInMemoryDataFeed() {} virtual ~MultiSlotInMemoryDataFeed() {} virtual void Init(const DataFeedDesc& data_feed_desc); protected: - virtual void AddInstanceToInsVec(std::vector* vec_ins, - const std::vector& instance, - int index); - virtual bool ParseOneInstance(std::vector* instance); - virtual bool ParseOneInstanceFromPipe(std::vector* instance); - virtual void PutToFeedVec(const std::vector& ins_vec); - virtual void SerializeIns(const std::vector*>& ins, - std::string* str); - virtual void DeserializeIns(std::vector>* ins, - const std::string& str); + virtual bool ParseOneInstance(Record* instance); + virtual bool ParseOneInstanceFromPipe(Record* instance); + virtual void PutToFeedVec(const std::vector& ins_vec); }; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 1b3edeed10352c6abc7cfadadbe16c1aa4e32078..119794e592c12b140ea3f5f8794839fb120cf42b 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -32,9 +32,14 @@ namespace framework { // constructor template DatasetImpl::DatasetImpl() { + VLOG(3) << "DatasetImpl::DatasetImpl() constructor"; thread_num_ = 1; trainer_num_ = 1; + channel_num_ = 1; file_idx_ = 0; + cur_channel_ = 0; + fleet_send_batch_size_ = 80000; + fleet_send_sleep_seconds_ = 2; } // set filelist, file_idx_ will reset to zero. @@ -58,10 +63,6 @@ void DatasetImpl::SetThreadNum(int thread_num) { template void DatasetImpl::SetTrainerNum(int trainer_num) { trainer_num_ = trainer_num; - // should inform reader of trainer_num directly - for (auto reader : readers_) { - reader->SetTrainerNum(trainer_num); - } } // if you run distributed, and want to do global shuffle, @@ -70,9 +71,6 @@ void DatasetImpl::SetTrainerNum(int trainer_num) { template void DatasetImpl::SetFleetSendBatchSize(int64_t size) { fleet_send_batch_size_ = size; - for (auto reader : readers_) { - reader->SetFleetSendBatchSize(size); - } } template @@ -92,12 +90,38 @@ void DatasetImpl::SetDataFeedDesc(const std::string& data_feed_desc_str) { &data_feed_desc_); } -// readers_.size() may not be equal to thread_num_, -// it changes when filelist_.size() < thread_num_ template -std::vector>& -DatasetImpl::GetReaders() { - return readers_; +void DatasetImpl::SetChannelNum(int channel_num) { + channel_num_ = channel_num; +} + +template +std::vector DatasetImpl::GetReaders() { + std::vector ret; + ret.reserve(readers_.size()); + for (auto i : readers_) { + ret.push_back(i.get()); + } + return ret; +} + +template +void DatasetImpl::CreateChannel() { + if (input_channel_ == nullptr) { + input_channel_ = paddle::framework::MakeChannel(); + } + if (multi_output_channel_.size() == 0) { + multi_output_channel_.reserve(channel_num_); + for (int i = 0; i < channel_num_; ++i) { + multi_output_channel_.push_back(paddle::framework::MakeChannel()); + } + } + if (multi_consume_channel_.size() == 0) { + multi_consume_channel_.reserve(channel_num_); + for (int i = 0; i < channel_num_; ++i) { + multi_consume_channel_.push_back(paddle::framework::MakeChannel()); + } + } } // if sent message between workers, should first call this function @@ -119,9 +143,6 @@ void DatasetImpl::LoadIntoMemory() { VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } std::vector load_threads; for (int64_t i = 0; i < thread_num_; ++i) { load_threads.push_back(std::thread( @@ -130,20 +151,63 @@ void DatasetImpl::LoadIntoMemory() { for (std::thread& t : load_threads) { t.join(); } + input_channel_->Close(); + int64_t in_chan_size = input_channel_->Size(); + input_channel_->SetBlockSize(in_chan_size / thread_num_ + 1); timeline.Pause(); VLOG(3) << "DatasetImpl::LoadIntoMemory() end" - << ", memory data size=" << memory_data_.size() + << ", memory data size=" << input_channel_->Size() << ", cost time=" << timeline.ElapsedSec() << " seconds"; } +template +void DatasetImpl::PreLoadIntoMemory() { + VLOG(3) << "DatasetImpl::PreLoadIntoMemory() begin"; + preload_threads_.clear(); + for (int64_t i = 0; i < thread_num_; ++i) { + preload_threads_.push_back(std::thread( + &paddle::framework::DataFeed::LoadIntoMemory, readers_[i].get())); + } + VLOG(3) << "DatasetImpl::PreLoadIntoMemory() end"; +} + +template +void DatasetImpl::WaitPreLoadDone() { + VLOG(3) << "DatasetImpl::WaitPreLoadDone() begin"; + for (std::thread& t : preload_threads_) { + t.join(); + } + input_channel_->Close(); + int64_t in_chan_size = input_channel_->Size(); + input_channel_->SetBlockSize(in_chan_size / thread_num_ + 1); + VLOG(3) << "DatasetImpl::WaitPreLoadDone() end"; +} + // release memory data template void DatasetImpl::ReleaseMemory() { VLOG(3) << "DatasetImpl::ReleaseMemory() begin"; - std::vector().swap(memory_data_); - for (int i = 0; i < readers_.size(); ++i) { - readers_[i]->ReleaseChannelData(); + if (input_channel_) { + input_channel_->Clear(); + input_channel_ = nullptr; + } + for (size_t i = 0; i < multi_output_channel_.size(); ++i) { + if (!multi_output_channel_[i]) { + continue; + } + multi_output_channel_[i]->Clear(); + multi_output_channel_[i] = nullptr; } + std::vector>().swap(multi_output_channel_); + for (size_t i = 0; i < multi_consume_channel_.size(); ++i) { + if (!multi_consume_channel_[i]) { + continue; + } + multi_consume_channel_[i]->Clear(); + multi_consume_channel_[i] = nullptr; + } + std::vector>().swap(multi_consume_channel_); + std::vector>().swap(readers_); VLOG(3) << "DatasetImpl::ReleaseMemory() end"; } @@ -153,21 +217,22 @@ void DatasetImpl::LocalShuffle() { VLOG(3) << "DatasetImpl::LocalShuffle() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } - // if it is not InMemory, memory_data_ is empty - std::random_shuffle(memory_data_.begin(), memory_data_.end()); - std::vector local_shuffle_threads; - for (int64_t i = 0; i < thread_num_; ++i) { - local_shuffle_threads.push_back(std::thread( - &paddle::framework::DataFeed::LocalShuffle, readers_[i].get())); - } - for (std::thread& t : local_shuffle_threads) { - t.join(); + if (!input_channel_ || input_channel_->Size() == 0) { + VLOG(3) << "DatasetImpl::LocalShuffle() end, no data to shuffle"; + return; } - std::vector().swap(memory_data_); + auto fleet_ptr = FleetWrapper::GetInstance(); + input_channel_->Close(); + std::vector data; + input_channel_->ReadAll(data); + std::shuffle(data.begin(), data.end(), fleet_ptr->LocalRandomEngine()); + input_channel_->Open(); + input_channel_->Write(std::move(data)); + data.clear(); + data.shrink_to_fit(); + input_channel_->Close(); + timeline.Pause(); VLOG(3) << "DatasetImpl::LocalShuffle() end, cost time=" << timeline.ElapsedSec() << " seconds"; @@ -178,23 +243,75 @@ void DatasetImpl::GlobalShuffle() { VLOG(3) << "DatasetImpl::GlobalShuffle() begin"; platform::Timer timeline; timeline.Start(); - if (readers_.size() == 0) { - CreateReaders(); - } auto fleet_ptr = FleetWrapper::GetInstance(); - // local shuffle all data before global shuffle - std::shuffle(memory_data_.begin(), memory_data_.end(), - fleet_ptr->LocalRandomEngine()); + + if (!input_channel_ || input_channel_->Size() == 0) { + VLOG(3) << "DatasetImpl::GlobalShuffle() end, no data to shuffle"; + return; + } + + // local shuffle + input_channel_->Close(); + std::vector data; + input_channel_->ReadAll(data); + std::shuffle(data.begin(), data.end(), fleet_ptr->LocalRandomEngine()); + input_channel_->Open(); + input_channel_->Write(std::move(data)); + data.clear(); + data.shrink_to_fit(); + + input_channel_->Close(); + input_channel_->SetBlockSize(fleet_send_batch_size_); + VLOG(3) << "DatasetImpl::GlobalShuffle() input_channel_ size " + << input_channel_->Size(); + + auto global_shuffle_func = [this]() { + auto fleet_ptr = FleetWrapper::GetInstance(); + std::vector data; + while (this->input_channel_->Read(data)) { + std::vector ars(this->trainer_num_); + for (auto& t : data) { + auto client_id = fleet_ptr->LocalRandomEngine()() % this->trainer_num_; + ars[client_id] << t; + } + std::vector> total_status; + std::vector send_index(this->trainer_num_); + for (int i = 0; i < this->trainer_num_; ++i) { + send_index[i] = i; + } + std::shuffle(send_index.begin(), send_index.end(), + fleet_ptr->LocalRandomEngine()); + for (auto index = 0u; index < this->trainer_num_; ++index) { + int i = send_index[index]; + if (ars[i].Length() == 0) { + continue; + } + std::string msg(ars[i].Buffer(), ars[i].Length()); + auto ret = fleet_ptr->SendClientToClientMsg(0, i, msg); + total_status.push_back(std::move(ret)); + } + for (auto& t : total_status) { + t.wait(); + } + ars.clear(); + ars.shrink_to_fit(); + data.clear(); + data.shrink_to_fit(); + sleep(this->fleet_send_sleep_seconds_); + } + }; + VLOG(3) << "start global shuffle threads"; std::vector global_shuffle_threads; for (int i = 0; i < thread_num_; ++i) { - global_shuffle_threads.push_back(std::thread( - &paddle::framework::DataFeed::GlobalShuffle, readers_[i].get())); + global_shuffle_threads.push_back(std::thread(global_shuffle_func)); } for (std::thread& t : global_shuffle_threads) { t.join(); } - std::vector().swap(memory_data_); + global_shuffle_threads.clear(); + global_shuffle_threads.shrink_to_fit(); + input_channel_->Clear(); timeline.Pause(); VLOG(3) << "DatasetImpl::GlobalShuffle() end, cost time=" << timeline.ElapsedSec() << " seconds"; @@ -203,78 +320,67 @@ void DatasetImpl::GlobalShuffle() { template void DatasetImpl::CreateReaders() { VLOG(3) << "Calling CreateReaders()"; - CHECK(thread_num_ > 0) << "thread_num should > 0"; - int file_cnt = filelist_.size(); - int memory_data_size = memory_data_.size(); - if (memory_data_size != 0 && thread_num_ > memory_data_size) { - VLOG(3) << "Dataset thread num = " << thread_num_ - << ", memory data size = " << memory_data_size - << ". Changing Dataset thread num = " << memory_data_size; - thread_num_ = memory_data_size; - } else if (file_cnt != 0 && thread_num_ > file_cnt) { - VLOG(3) << "Dataset thread num = " << thread_num_ - << ", file num = " << file_cnt - << ". Changing Dataset thread num = " << file_cnt; - thread_num_ = file_cnt; - } - VLOG(3) << "thread_num in Readers: " << thread_num_; + VLOG(3) << "thread num in Dataset: " << thread_num_; + VLOG(3) << "Filelist size in Dataset: " << filelist_.size(); + VLOG(3) << "channel num in Dataset: " << channel_num_; + CHECK(thread_num_ > 0) << "thread num should > 0"; + CHECK(thread_num_ <= filelist_.size()) + << "thread num should <= filelist size"; + CHECK(channel_num_ > 0) << "channel num should > 0"; + CHECK(channel_num_ <= thread_num_) << "channel num should <= thread num"; VLOG(3) << "readers size: " << readers_.size(); - VLOG(3) << "Filelist size in readers: " << filelist_.size(); if (readers_.size() != 0) { + VLOG(3) << "readers_.size() = " << readers_.size() + << ", will not create again"; return; } VLOG(3) << "data feed class name: " << data_feed_desc_.name(); + int channel_idx = 0; for (int i = 0; i < thread_num_; ++i) { readers_.push_back(DataFeedFactory::CreateDataFeed(data_feed_desc_.name())); - readers_.back()->Init(data_feed_desc_); - readers_.back()->SetMemoryData(&memory_data_); - readers_.back()->SetMemoryDataMutex(&mutex_for_update_memory_data_); - readers_.back()->SetThreadId(i); - readers_.back()->SetThreadNum(thread_num_); - readers_.back()->SetTrainerNum(trainer_num_); - readers_.back()->SetFileListMutex(&mutex_for_pick_file_); - readers_.back()->SetFileListIndex(&file_idx_); - readers_.back()->SetFileList(filelist_); + readers_[i]->Init(data_feed_desc_); + readers_[i]->SetThreadId(i); + readers_[i]->SetThreadNum(thread_num_); + readers_[i]->SetFileListMutex(&mutex_for_pick_file_); + readers_[i]->SetFileListIndex(&file_idx_); + readers_[i]->SetFileList(filelist_); + if (input_channel_ != nullptr) { + readers_[i]->SetInputChannel(input_channel_.get()); + } + if (cur_channel_ == 0 && channel_idx < multi_output_channel_.size()) { + readers_[i]->SetOutputChannel(multi_output_channel_[channel_idx].get()); + readers_[i]->SetConsumeChannel(multi_consume_channel_[channel_idx].get()); + } else if (channel_idx < multi_output_channel_.size()) { + readers_[i]->SetOutputChannel(multi_consume_channel_[channel_idx].get()); + readers_[i]->SetConsumeChannel(multi_output_channel_[channel_idx].get()); + } + ++channel_idx; + if (channel_idx >= channel_num_) { + channel_idx = 0; + } } + VLOG(3) << "readers size: " << readers_.size(); } template void DatasetImpl::DestroyReaders() { VLOG(3) << "Calling DestroyReaders()"; - // clear memory_data_ before fill it - // because if LoadIntoMemory but no Shuffle, - // memory_data_ has empty data which has been std::move to channel - if (memory_data_.size() != 0) { - std::vector().swap(memory_data_); - } - std::vector fill_threads; - for (int i = 0; i < thread_num_; ++i) { - fill_threads.push_back( - std::thread(&paddle::framework::DataFeed::FillChannelToMemoryData, - readers_[i].get())); - } - for (std::thread& t : fill_threads) { - t.join(); - } std::vector>().swap(readers_); VLOG(3) << "readers size: " << readers_.size(); - // if memory_data_ is empty, which means it's not InMemory mode, - // so the next epoch should read all data again - if (memory_data_.size() == 0) { - file_idx_ = 0; - } + file_idx_ = 0; + cur_channel_ = 1 - cur_channel_; } template int64_t DatasetImpl::GetMemoryDataSize() { - return memory_data_.size(); + return input_channel_->Size(); } template int64_t DatasetImpl::GetShuffleDataSize() { int64_t sum = 0; - for (int i = 0; i < readers_.size(); ++i) { - sum += readers_[i]->GetChannelDataSize(); + for (size_t i = 0; i < multi_output_channel_.size(); ++i) { + sum += multi_output_channel_[i]->Size() + multi_consume_channel_[i]->Size(); } return sum; } @@ -285,16 +391,34 @@ int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, #ifdef _LINUX VLOG(3) << "ReceiveFromClient msg_type=" << msg_type << ", client_id=" << client_id << ", msg length=" << msg.length(); + if (msg.length() == 0) { + return 0; + } + paddle::framework::BinaryArchive ar; + ar.SetReadBuffer(const_cast(msg.c_str()), msg.length(), nullptr); + if (ar.Cursor() == ar.Finish()) { + return 0; + } + std::vector data; + while (ar.Cursor() < ar.Finish()) { + data.push_back(ar.Get()); + } + CHECK(ar.Cursor() == ar.Finish()); + auto fleet_ptr = FleetWrapper::GetInstance(); - int64_t index = fleet_ptr->LocalRandomEngine()() % thread_num_; + int64_t index = fleet_ptr->LocalRandomEngine()() % channel_num_; VLOG(3) << "ramdom index=" << index; - readers_[index]->PutInsToChannel(msg); + multi_output_channel_[index]->Write(std::move(data)); + + data.clear(); + data.shrink_to_fit(); #endif return 0; } // explicit instantiation template class DatasetImpl>; +template class DatasetImpl; } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index ffbc7bfd95b2bada56b0d6280d05bf678645fb1c..bb17b25c5a8e112c264eaf45dd0d73794a02165c 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -55,6 +55,8 @@ class Dataset { // set data fedd desc, which contains: // data feed name, batch size, slots virtual void SetDataFeedDesc(const std::string& data_feed_desc_str) = 0; + // set channel num + virtual void SetChannelNum(int channel_num) = 0; // get file list virtual const std::vector& GetFileList() = 0; // get thread num @@ -67,14 +69,21 @@ class Dataset { virtual std::pair GetHdfsConfig() = 0; // get data fedd desc virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() = 0; + // get channel num + virtual int GetChannelNum() = 0; // get readers, the reader num depend both on thread num // and filelist size - virtual std::vector>& - GetReaders() = 0; + virtual std::vector GetReaders() = 0; + // create input channel and output channel + virtual void CreateChannel() = 0; // register message handler between workers virtual void RegisterClientToClientMsgHandler() = 0; // load all data into memory virtual void LoadIntoMemory() = 0; + // load all data into memory in async mode + virtual void PreLoadIntoMemory() = 0; + // wait async load done + virtual void WaitPreLoadDone() = 0; // release all memory data virtual void ReleaseMemory() = 0; // local shuffle data @@ -110,6 +119,7 @@ class DatasetImpl : public Dataset { virtual void SetHdfsConfig(const std::string& fs_name, const std::string& fs_ugi); virtual void SetDataFeedDesc(const std::string& data_feed_desc_str); + virtual void SetChannelNum(int channel_num); virtual const std::vector& GetFileList() { return filelist_; } virtual int GetThreadNum() { return thread_num_; } @@ -121,11 +131,13 @@ class DatasetImpl : public Dataset { virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() { return data_feed_desc_; } - virtual std::vector>& - GetReaders(); - + virtual int GetChannelNum() { return channel_num_; } + virtual std::vector GetReaders(); + virtual void CreateChannel(); virtual void RegisterClientToClientMsgHandler(); virtual void LoadIntoMemory(); + virtual void PreLoadIntoMemory(); + virtual void WaitPreLoadDone(); virtual void ReleaseMemory(); virtual void LocalShuffle(); virtual void GlobalShuffle(); @@ -138,8 +150,14 @@ class DatasetImpl : public Dataset { virtual int ReceiveFromClient(int msg_type, int client_id, const std::string& msg); std::vector> readers_; - std::vector memory_data_; - std::mutex mutex_for_update_memory_data_; + paddle::framework::Channel input_channel_; + int channel_num_; + std::vector> multi_output_channel_; + std::vector> multi_consume_channel_; + // when read ins, we put ins from one channel to the other, + // and when finish reading, we set cur_channel = 1 - cur_channel, + // so if cur_channel=0, all data are in output_channel, else consume_channel + int cur_channel_; int thread_num_; paddle::framework::DataFeedDesc data_feed_desc_; int trainer_num_; @@ -148,12 +166,13 @@ class DatasetImpl : public Dataset { std::mutex mutex_for_pick_file_; std::string fs_name_; std::string fs_ugi_; - unsigned int rand_seed; int64_t fleet_send_batch_size_; + int64_t fleet_send_sleep_seconds_; + std::vector preload_threads_; }; // use std::vector as data type -class MultiSlotDataset : public DatasetImpl> { +class MultiSlotDataset : public DatasetImpl { public: MultiSlotDataset() {} virtual ~MultiSlotDataset() {} diff --git a/paddle/fluid/framework/dataset_factory.cc b/paddle/fluid/framework/dataset_factory.cc index 60be4cf9a43c01666c94018b7339da5f3ba797e5..3a28c101d48342ef639956a974d59aee8ae42ed6 100644 --- a/paddle/fluid/framework/dataset_factory.cc +++ b/paddle/fluid/framework/dataset_factory.cc @@ -21,14 +21,14 @@ limitations under the License. */ namespace paddle { namespace framework { -typedef std::shared_ptr (*CreateDatasetFunction)(); +typedef std::unique_ptr (*CreateDatasetFunction)(); typedef std::unordered_map datasetMap; datasetMap g_dataset_map; #define REGISTER_DATASET_CLASS(dataset_class) \ namespace { \ - std::shared_ptr Creator_##dataset_class() { \ - return std::shared_ptr(new dataset_class); \ + std::unique_ptr Creator_##dataset_class() { \ + return std::unique_ptr(new dataset_class); \ } \ class __Registerer_##dataset_class { \ public: \ @@ -50,7 +50,7 @@ std::string DatasetFactory::DatasetTypeList() { return dataset_types; } -std::shared_ptr DatasetFactory::CreateDataset( +std::unique_ptr DatasetFactory::CreateDataset( std::string dataset_class) { if (g_dataset_map.count(dataset_class) < 1) { LOG(WARNING) << "Your Dataset " << dataset_class diff --git a/paddle/fluid/framework/dataset_factory.h b/paddle/fluid/framework/dataset_factory.h index 2894b69f8faca4b261347ed3b55e965ff8ee53fa..d4a36cec22fc0af27a38ee7cd810a2eaa7988ea1 100644 --- a/paddle/fluid/framework/dataset_factory.h +++ b/paddle/fluid/framework/dataset_factory.h @@ -23,7 +23,7 @@ namespace framework { class DatasetFactory { public: static std::string DatasetTypeList(); - static std::shared_ptr CreateDataset(std::string dataset_class); + static std::unique_ptr CreateDataset(std::string dataset_class); }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/device_worker.cc b/paddle/fluid/framework/device_worker.cc index 443acf0a16303ef47d24b3013ed92929d0d7839e..7fe60b4446a1c888b4f0a1b3ad1897eea4829bb9 100644 --- a/paddle/fluid/framework/device_worker.cc +++ b/paddle/fluid/framework/device_worker.cc @@ -19,7 +19,7 @@ namespace framework { void DeviceWorker::SetRootScope(Scope* root_scope) { root_scope_ = root_scope; } -void DeviceWorker::SetDataFeed(const std::shared_ptr& data_feed) { +void DeviceWorker::SetDataFeed(DataFeed* data_feed) { device_reader_ = data_feed; } diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index be5f663e1c96c5500093f3cceb2716a185224a1d..035392ef318cb7861a7db185df990ec9bf7bbfb9 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -113,7 +113,7 @@ class DeviceWorker { // will make this zero copy in the future virtual void BindingDataFeedMemory() = 0; virtual void SetRootScope(Scope* root_scope); - virtual void SetDataFeed(const std::shared_ptr& data_feed); + virtual void SetDataFeed(DataFeed* data_feed); virtual void SetPlace(const paddle::platform::Place& place) { place_ = place; } @@ -121,7 +121,7 @@ class DeviceWorker { protected: Scope* root_scope_; paddle::platform::Place place_; - std::shared_ptr device_reader_; + DataFeed* device_reader_; int64_t batch_num_; FetchConfig fetch_config_; bool use_cvm_; diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 481e12fcd63e77b6d42143f93df69c0f6abe7f25..8cd0789c0aeb429827e97804fb8afaed4214a75c 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -27,8 +27,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); - dataset->CreateReaders(); - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); thread_num_ = readers.size(); @@ -72,7 +71,6 @@ void DistMultiTrainer::Finalize() { th.join(); } pull_dense_worker_->Stop(); - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/expect.h b/paddle/fluid/framework/expect.h new file mode 100644 index 0000000000000000000000000000000000000000..146f4de9382a687686d5f7fdd6f4fa2300cb043b --- /dev/null +++ b/paddle/fluid/framework/expect.h @@ -0,0 +1,32 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#ifdef _LINUX +#ifndef likely +#define likely(x) __builtin_expect((x), 1) +#endif +#endif + +#ifdef _LINUX +#ifndef unlikely +#define unlikely(x) __builtin_expect((x), 0) +#endif +#endif diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index 3a266e4bda91d5962ce09b241cc5e5671d67a142..8cbf2efa81a906a11331e067522b79f8df5204b2 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -26,9 +26,7 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); // get filelist from trainer_desc here - dataset->CreateReaders(); - VLOG(3) << "readers created"; - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); VLOG(3) << "readers num: " << readers.size(); // change thread num to readers num @@ -75,7 +73,6 @@ void MultiTrainer::Finalize() { for (auto& th : threads_) { th.join(); } - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 3edffd434a1610bf0c4d8d9464de76fd1d9a6944..916359ab6b181ce4746e8359a10f5aceaa74d2eb 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -28,9 +28,7 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, SetDataset(dataset); // get filelist from trainer_desc here - dataset->CreateReaders(); - VLOG(3) << "readers created"; - const std::vector> readers = + const std::vector readers = dataset->GetReaders(); VLOG(3) << "readers num: " << readers.size(); @@ -259,7 +257,6 @@ void PipelineTrainer::Finalize() { pipeline_scopes_[0]->FindVar(var)->Get(); TensorCopySync(thread_tensor, platform::CPUPlace(), root_tensor); } - dataset_ptr_->DestroyReaders(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index b491725974ca117a1ddd7573e46ecc5d127759cf..5fe296ff20df74947c206d28aa44f27a45042d81 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -74,7 +74,7 @@ class MultiTrainer : public TrainerBase { protected: int thread_num_; std::vector threads_; - std::vector> readers_; + std::vector readers_; std::vector> workers_; }; @@ -136,7 +136,7 @@ class PipelineTrainer : public TrainerBase { std::vector> sync_functors_; std::shared_ptr nccl_ctx_map_; - std::vector> readers_; + std::vector readers_; void InitFirstScopeQueue(ScopeQueue* scope_queue, int pipeline_id, const ProgramDesc& main_program); diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 3e2c976076aa1e1760511c31f77cef132e116dd2..dd94ac500cf8bf0dba7e7b6d8dc6688c35a5f51e 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -42,33 +42,64 @@ namespace paddle { namespace pybind { void BindDataset(py::module* m) { - py::class_>(*m, + py::class_>(*m, "Dataset") .def(py::init([](const std::string& name = "MultiSlotDataset") { return framework::DatasetFactory::CreateDataset(name); })) - .def("set_filelist", &framework::Dataset::SetFileList) - .def("set_thread_num", &framework::Dataset::SetThreadNum) - .def("set_trainer_num", &framework::Dataset::SetTrainerNum) + .def("set_filelist", &framework::Dataset::SetFileList, + py::call_guard()) + .def("set_thread_num", &framework::Dataset::SetThreadNum, + py::call_guard()) + .def("set_trainer_num", &framework::Dataset::SetTrainerNum, + py::call_guard()) .def("set_fleet_send_batch_size", - &framework::Dataset::SetFleetSendBatchSize) - .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig) - .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) - .def("get_filelist", &framework::Dataset::GetFileList) - .def("get_thread_num", &framework::Dataset::GetThreadNum) - .def("get_trainer_num", &framework::Dataset::GetTrainerNum) + &framework::Dataset::SetFleetSendBatchSize, + py::call_guard()) + .def("set_hdfs_config", &framework::Dataset::SetHdfsConfig, + py::call_guard()) + .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc, + py::call_guard()) + .def("get_filelist", &framework::Dataset::GetFileList, + py::call_guard()) + .def("get_thread_num", &framework::Dataset::GetThreadNum, + py::call_guard()) + .def("get_trainer_num", &framework::Dataset::GetTrainerNum, + py::call_guard()) .def("get_fleet_send_batch_size", - &framework::Dataset::GetFleetSendBatchSize) - .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig) - .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc) + &framework::Dataset::GetFleetSendBatchSize, + py::call_guard()) + .def("get_hdfs_config", &framework::Dataset::GetHdfsConfig, + py::call_guard()) + .def("get_data_feed_desc", &framework::Dataset::GetDataFeedDesc, + py::call_guard()) .def("register_client2client_msg_handler", - &framework::Dataset::RegisterClientToClientMsgHandler) - .def("load_into_memory", &framework::Dataset::LoadIntoMemory) - .def("release_memory", &framework::Dataset::ReleaseMemory) - .def("local_shuffle", &framework::Dataset::LocalShuffle) - .def("global_shuffle", &framework::Dataset::GlobalShuffle) - .def("get_memory_data_size", &framework::Dataset::GetMemoryDataSize) - .def("get_shuffle_data_size", &framework::Dataset::GetShuffleDataSize); + &framework::Dataset::RegisterClientToClientMsgHandler, + py::call_guard()) + .def("create_channel", &framework::Dataset::CreateChannel, + py::call_guard()) + .def("create_readers", &framework::Dataset::CreateReaders, + py::call_guard()) + .def("destroy_readers", &framework::Dataset::DestroyReaders, + py::call_guard()) + .def("load_into_memory", &framework::Dataset::LoadIntoMemory, + py::call_guard()) + .def("preload_into_memory", &framework::Dataset::PreLoadIntoMemory, + py::call_guard()) + .def("wait_preload_done", &framework::Dataset::WaitPreLoadDone, + py::call_guard()) + .def("release_memory", &framework::Dataset::ReleaseMemory, + py::call_guard()) + .def("local_shuffle", &framework::Dataset::LocalShuffle, + py::call_guard()) + .def("global_shuffle", &framework::Dataset::GlobalShuffle, + py::call_guard()) + .def("get_memory_data_size", &framework::Dataset::GetMemoryDataSize, + py::call_guard()) + .def("get_shuffle_data_size", &framework::Dataset::GetShuffleDataSize, + py::call_guard()) + .def("set_queue_num", &framework::Dataset::SetChannelNum, + py::call_guard()); } } // end namespace pybind diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index b3d58a589bd0286967abb1cee016d8d401c3a62a..8c886ca23b6871527f173c490ff4fe2f5ff83515 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -71,6 +71,7 @@ class DatasetBase(object): self.proto_desc.pipe_command = "cat" self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 0 + self.filelist = [] def set_pipe_command(self, pipe_command): """ @@ -139,6 +140,7 @@ class DatasetBase(object): filelist(list): file list """ self.dataset.set_filelist(filelist) + self.filelist = filelist def set_use_var(self, var_list): """ @@ -193,7 +195,14 @@ class DatasetBase(object): Set data_feed_desc before load or shuffle, user no need to call this function. """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + self.dataset.set_thread_num(self.thread_num) self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_readers() + + def _finish_to_run(self): + self.dataset.destroy_readers() def desc(self): """ @@ -226,6 +235,57 @@ class InMemoryDataset(DatasetBase): """ Init. """ super(InMemoryDataset, self).__init__() self.proto_desc.name = "MultiSlotInMemoryDataFeed" + self.fleet_send_batch_size = 80000 + self.queue_num = None + + def _prepare_to_run(self): + """ + Set data_feed_desc before load or shuffle, + user no need to call this function. + """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + self.dataset.set_thread_num(self.thread_num) + if self.queue_num is None: + self.queue_num = self.thread_num + self.dataset.set_queue_num(self.queue_num) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_channel() + self.dataset.create_readers() + + def set_queue_num(self, queue_num): + """ + Set Dataset output queue num, training threads get data from queues + + Args: + set_queue_num(int): dataset output queue num + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_queue_num(12) + + """ + self.queue_num = queue_num + + def set_fleet_send_batch_size(self, fleet_send_batch_size): + """ + Set fleet send batch size, default is 80000 + + Args: + fleet_send_batch_size(int): fleet send batch size + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fleet_send_batch_size(800) + + """ + self.fleet_send_batch_size = fleet_send_batch_size def load_into_memory(self): """ @@ -243,6 +303,39 @@ class InMemoryDataset(DatasetBase): self._prepare_to_run() self.dataset.load_into_memory() + def preload_into_memory(self): + """ + Load data into memory in async mode + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self._prepare_to_run() + self.dataset.preload_into_memory() + + def wait_preload_done(self): + """ + Wait preload_into_memory done + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self.dataset.wait_preload_done() + def local_shuffle(self): """ Local shuffle @@ -282,13 +375,12 @@ class InMemoryDataset(DatasetBase): """ trainer_num = 1 - fleet_send_batch_size = 80000 if fleet is not None: fleet._role_maker._barrier_worker() trainer_num = fleet.worker_num() self.dataset.register_client2client_msg_handler() self.dataset.set_trainer_num(trainer_num) - self.dataset.set_fleet_send_batch_size(fleet_send_batch_size) + self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size) if fleet is not None: fleet._role_maker._barrier_worker() self.dataset.global_shuffle() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1bca5bb3a14657c409960db2d10ed90936d11de5..a44570abbbea3a47c8c4a4f957b8224e815c051b 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -889,6 +889,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is needed and should be initialized") + dataset._prepare_to_run() scope, trainer = self._prepare_trainer( program=program, dataset=dataset, @@ -900,11 +901,11 @@ class Executor(object): print_period=print_period) trainer._set_infer(True) trainer._gen_trainer_desc() - dataset._prepare_to_run() self._dump_debug_info(program=program, trainer=trainer) self._default_executor.run_from_dataset(program.desc, scope, dataset.dataset, trainer._desc()) + dataset._finish_to_run() return None def train_from_dataset(self, @@ -969,6 +970,7 @@ class Executor(object): if dataset == None: raise RuntimeError("dataset is need and should be initialized") + dataset._prepare_to_run() scope, trainer = self._prepare_trainer( program=program, dataset=dataset, @@ -979,9 +981,9 @@ class Executor(object): fetch_info=fetch_info, print_period=print_period) trainer._gen_trainer_desc() - dataset._prepare_to_run() self._dump_debug_info(program=program, trainer=trainer) self._default_executor.run_from_dataset(program.desc, scope, dataset.dataset, trainer._desc()) + dataset._finish_to_run() return None