提交 def2a8b1 编写于 作者: C chengduoZH

Async memcpy

上级 437debf4
...@@ -20,79 +20,41 @@ namespace paddle { ...@@ -20,79 +20,41 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
// Change it to thread safe flags if needed. template <class T>
class ThreadUnsafeOwnershipFlags { class COWPtr {
public: public:
explicit ThreadUnsafeOwnershipFlags(bool flag) : flag_(flag) {} typedef std::shared_ptr<T> RefPtr;
ThreadUnsafeOwnershipFlags(const ThreadUnsafeOwnershipFlags& other) = delete;
ThreadUnsafeOwnershipFlags& operator=(
const ThreadUnsafeOwnershipFlags& other) = delete;
ThreadUnsafeOwnershipFlags(ThreadUnsafeOwnershipFlags&& other) = default;
void SetOwnership(bool flag) { flag_ = flag; } private:
RefPtr m_sp;
// Invoke the callback if it is not owned. void detach() {
template <typename Callback> T* tmp = m_sp.get();
void AcquireOwnershipOnce(Callback acquire) { if (!(tmp == nullptr || m_sp.unique())) {
if (!flag_) { m_sp = RefPtr(new T(*tmp));
acquire();
flag_ = true;
} }
} }
private:
bool flag_;
};
// Copy-On-Write pointer.
// It will hold a T* pointer, and only copy once when `MutableData` is invoked.
//
// The template parameter OwnershipFlags should have:
// * a constructor takes a bool. True if own.
// * SetOwnership(bool flag).
// * AcquireOwnershipOnce(Callback). It will invoke the callback if it is not
// owned.
//
// https://en.wikipedia.org/wiki/Copy-on-write
template <typename T, typename OwnershipFlags = ThreadUnsafeOwnershipFlags>
class COWPtr {
public: public:
// Ctor from raw pointer. COWPtr() : m_sp(nullptr) {}
explicit COWPtr(T* ptr) : payload_(ptr), ownership_{true} {} explicit COWPtr(T* t) : m_sp(t) {}
explicit COWPtr(const RefPtr& refptr) : m_sp(refptr) {}
// Move methods. Steal ownership from origin const T& Data() const { return operator*(); }
COWPtr(COWPtr&& other)
: payload_(other.payload_), ownership_{std::move(other.ownership_)} {}
COWPtr& operator=(COWPtr&& origin) = default;
// Copy methods. Not own payload T* MutableData() { return operator->(); }
COWPtr(const COWPtr& other) : payload_(other.payload_), ownership_{false} {}
COWPtr& operator=(const COWPtr& other) {
payload_ = other.payload_;
ownership_.SetOwnership(false);
return *this;
}
// Access read only data.
const T& Data() const { return *payload_; }
// Access mutable data. If the data is not owned, the data will be copied const T& operator*() const { return *m_sp; }
// before. T& operator*() {
T* MutableData() { detach();
ownership_.AcquireOwnershipOnce( return *m_sp;
[this] { payload_.reset(new T(*payload_)); }); }
return payload_.get(); const T* operator->() const { return m_sp.operator->(); }
T* operator->() {
detach();
return m_sp.operator->();
} }
private:
// Actual data pointer.
std::shared_ptr<T> payload_;
// Ownership flag.
OwnershipFlags ownership_;
}; };
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -30,6 +30,14 @@ TEST(COWPtr, all) { ...@@ -30,6 +30,14 @@ TEST(COWPtr, all) {
ASSERT_EQ(ptr2.Data(), 10); ASSERT_EQ(ptr2.Data(), 10);
} }
TEST(COWPtr, change_old) {
COWPtr<int> ptr(new int{0});
COWPtr<int> ptr2 = ptr;
*ptr.MutableData() = 10;
ASSERT_EQ(ptr2.Data(), 0);
ASSERT_EQ(ptr.Data(), 10);
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -17,10 +17,12 @@ ...@@ -17,10 +17,12 @@
#include <algorithm> #include <algorithm>
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
#include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/framework/details/cow_ptr.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/memory/memcpy.h"
#include "glog/logging.h" #include "glog/logging.h"
...@@ -28,206 +30,392 @@ namespace paddle { ...@@ -28,206 +30,392 @@ namespace paddle {
namespace framework { namespace framework {
#if defined(PADDLE_WITH_CUDA) #if defined(PADDLE_WITH_CUDA)
namespace details {
struct CUDABuffer {
void *data_{nullptr};
size_t size_{0};
platform::CUDAPlace place_;
CUDABuffer() {}
CUDABuffer(platform::Place place, size_t size)
: size_(size), place_(boost::get<platform::CUDAPlace>(place)) {
data_ = memory::Alloc(place_, size);
}
~CUDABuffer() { ClearMemory(); }
CUDABuffer(const CUDABuffer &o) = delete;
CUDABuffer &operator=(const CUDABuffer &o) = delete;
void Resize(platform::Place place, size_t size) {
ClearMemory();
place_ = boost::get<platform::CUDAPlace>(place);
data_ = memory::Alloc(place_, size);
size_ = size;
}
void Swap(CUDABuffer &o) {
std::swap(data_, o.data_);
std::swap(place_, o.place_);
std::swap(size_, o.size_);
}
private:
void ClearMemory() const {
if (data_) {
memory::Free(place_, data_);
}
}
};
} // namespace details
// Vector<T> implements the std::vector interface, and can get Data or // Vector<T> implements the std::vector interface, and can get Data or
// MutableData from any place. The data will be synced implicitly inside. // MutableData from any place. The data will be synced implicitly inside.
template <typename T> template <typename T>
class Vector { class Vector {
public: public:
using value_type = T; using value_type = T;
using iterator = typename std::vector<T>::iterator;
using const_iterator = typename std::vector<T>::const_iterator;
// Default ctor. Create empty Vector private:
Vector() { InitEmpty(); } // The actual class to implement vector logic
class VectorData {
public:
VectorData() : flag_(kDataInCPU) {}
VectorData(size_t count, const T &value)
: cpu_(count, value), flag_(kDataInCPU) {}
VectorData(std::initializer_list<T> init) : cpu_(init), flag_(kDataInCPU) {}
template <typename U>
explicit VectorData(const std::vector<U> &dat)
: cpu_(dat), flag_(kDataInCPU) {}
VectorData(const VectorData &o) {
o.ImmutableCPU();
cpu_ = o.cpu_;
flag_ = kDataInCPU;
}
// Fill vector with value. The vector size is `count`. VectorData &operator=(const VectorData &o) {
explicit Vector(size_t count, const T &value = T()) { o.ImmutableCPU();
InitEmpty(); cpu_ = o.cpu_;
if (count != 0) { flag_ = kDataInCPU;
resize(count); details::CUDABuffer null;
T *ptr = begin(); gpu_.Swap(null);
for (size_t i = 0; i < count; ++i) { return *this;
ptr[i] = value; }
T &operator[](size_t i) {
MutableCPU();
return cpu_[i];
}
const T &operator[](size_t i) const {
ImmutableCPU();
return cpu_[i];
}
size_t size() const { return cpu_.size(); }
iterator begin() {
MutableCPU();
return cpu_.begin();
}
iterator end() {
MutableCPU();
return cpu_.end();
}
T &front() {
MutableCPU();
return cpu_.front();
}
T &back() {
MutableCPU();
return cpu_.back();
}
const_iterator begin() const {
ImmutableCPU();
return cpu_.begin();
}
const_iterator end() const {
ImmutableCPU();
return cpu_.end();
}
const T &back() const {
ImmutableCPU();
return cpu_.back();
}
T *data() { return &(*this)[0]; }
const T *data() const { return &(*this)[0]; }
const T &front() const {
ImmutableCPU();
return cpu_.front();
}
// assign this from iterator.
// NOTE: the iterator must support `end-begin`
template <typename Iter>
void assign(Iter begin, Iter end) {
MutableCPU();
cpu_.assign(begin, end);
}
// push_back. If the previous capacity is not enough, the memory will
// double.
void push_back(T elem) {
MutableCPU();
cpu_.push_back(elem);
}
// extend a vector by iterator.
// NOTE: the iterator must support end-begin
template <typename It>
void Extend(It begin, It end) {
MutableCPU();
cpu_.reserve((end - begin) + cpu_.size());
std::copy(begin, end, cpu_.begin());
}
// resize the vector
void resize(size_t size) {
MutableCPU();
cpu_.resize(size);
}
// get cuda ptr. immutable
const T *CUDAData(platform::Place place) const {
PADDLE_ENFORCE(platform::is_gpu_place(place),
"CUDA Data must on CUDA place");
ImmutableCUDA(place);
return reinterpret_cast<T *>(gpu_.data_);
}
// get cuda ptr. mutable
T *CUDAMutableData(platform::Place place) {
const T *ptr = CUDAData(place);
flag_ = kDirty | kDataInCUDA;
return const_cast<T *>(ptr);
}
// clear
void clear() {
cpu_.clear();
flag_ = kDirty | kDataInCPU;
}
size_t capacity() const { return cpu_.capacity(); }
// reserve data
void reserve(size_t size) { cpu_.reserve(size); }
// implicit cast operator. Vector can be cast to std::vector implicitly.
operator std::vector<T>() const {
ImmutableCPU();
return cpu_;
}
bool operator==(const VectorData &other) const {
ImmutableCPU();
other.ImmutableCPU();
return cpu_ == other.cpu_;
}
private:
enum DataFlag {
kDataInCPU = 0x01,
kDataInCUDA = 0x02,
// kDirty means the data has been changed in one device.
kDirty = 0x10
};
void CopyToCPU() const {
// COPY GPU Data To CPU
void *src = gpu_.data_;
void *dst = cpu_.data();
memory::Copy(platform::CPUPlace(), dst, gpu_.place_, src, gpu_.size_,
nullptr);
}
void MutableCPU() {
if (IsInCUDA() && IsDirty()) {
CopyToCPU();
} }
flag_ = kDirty | kDataInCPU;
} }
}
// Ctor with init_list void ImmutableCUDA(platform::Place place) const {
Vector(std::initializer_list<T> init) { if (IsDirty()) {
if (init.size() == 0) { if (IsInCPU()) {
InitEmpty(); CopyCPUDataToCUDA(place);
} else { UnsetFlag(kDirty);
InitByIter(init.size(), init.begin(), init.end()); SetFlag(kDataInCUDA);
} else if (IsInCUDA() &&
!(boost::get<platform::CUDAPlace>(place) == gpu_.place_)) {
CopyCUDADataToAnotherPlace(place);
// Still dirty
} else {
// Dirty && DataInCUDA && Device is same
// Do nothing
}
} else {
if (!IsInCUDA()) {
// Even data is not dirty. However, data is not in CUDA. Copy data.
CopyCPUDataToCUDA(place);
SetFlag(kDataInCUDA);
} else if (!(boost::get<platform::CUDAPlace>(place) == gpu_.place_)) {
CopyCUDADataToAnotherPlace(place);
} else {
// Not Dirty && DataInCUDA && Device is same
// Do nothing.
}
}
} }
} void CopyCUDADataToAnotherPlace(const platform::Place &place) const {
details::CUDABuffer tmp(place, gpu_.size_);
const void *src = gpu_.data_;
void *dst = tmp.data_;
memory::Copy(tmp.place_, dst, gpu_.place_, src, gpu_.size_, nullptr);
gpu_.Swap(tmp);
}
void CopyCPUDataToCUDA(const platform::Place &place) const {
void *src = cpu_.data();
gpu_.Resize(place, cpu_.size() * sizeof(T));
void *dst = gpu_.data_;
memory::Copy(boost::get<platform::CUDAPlace>(place), dst,
platform::CPUPlace(), src, gpu_.size_, nullptr);
}
void ImmutableCPU() const {
if (IsDirty() && !IsInCPU()) { // If data has been changed in CUDA, or
// CPU has no data.
CopyToCPU();
UnsetFlag(kDirty);
}
SetFlag(kDataInCPU);
}
void UnsetFlag(int flag) const { flag_ &= ~flag; }
void SetFlag(int flag) const { flag_ |= flag; }
bool IsDirty() const { return flag_ & kDirty; }
bool IsInCUDA() const { return flag_ & kDataInCUDA; }
bool IsInCPU() const { return flag_ & kDataInCPU; }
mutable std::vector<T> cpu_;
mutable details::CUDABuffer gpu_;
mutable int flag_;
};
public:
// Default ctor. Create empty Vector
Vector() : m_(new VectorData()) {}
// Fill vector with value. The vector size is `count`.
explicit Vector(size_t count, const T &value = T())
: m_(new VectorData(count, value)) {}
// Ctor with init_list
Vector(std::initializer_list<T> init) : m_(new VectorData(init)) {}
// implicit cast from std::vector. // implicit cast from std::vector.
template <typename U> template <typename U>
Vector(const std::vector<U> &dat) { // NOLINT Vector(const std::vector<U> &dat) : m_(new VectorData(dat)) { // NOLINT
if (dat.size() == 0) {
InitEmpty();
} else {
InitByIter(dat.size(), dat.begin(), dat.end());
}
} }
// Copy ctor // Copy ctor
Vector(const Vector<T> &other) { this->operator=(other); } Vector(const Vector<T> &other) { m_ = other.m_; }
// Copy operator // Copy operator
Vector<T> &operator=(const Vector<T> &other) { Vector<T> &operator=(const Vector<T> &other) {
if (other.size() != 0) { m_ = other.m_;
this->InitByIter(other.size(), other.begin(), other.end());
} else {
InitEmpty();
}
return *this; return *this;
} }
// Move ctor // Move ctor
Vector(Vector<T> &&other) { Vector(Vector<T> &&other) { m_ = std::move(other.m_); }
this->size_ = other.size_;
this->flag_ = other.flag_;
if (other.cuda_vec_.memory_size()) {
this->cuda_vec_.ShareDataWith(other.cuda_vec_);
}
if (other.cpu_vec_.memory_size()) {
this->cpu_vec_.ShareDataWith(other.cpu_vec_);
}
}
// CPU data access method. Mutable. // CPU data access method. Mutable.
T &operator[](size_t i) { T &operator[](size_t i) { return (*m_)[i]; }
MutableCPU();
return const_cast<T *>(cpu_vec_.data<T>())[i];
}
// CPU data access method. Immutable. // CPU data access method. Immutable.
const T &operator[](size_t i) const { const T &operator[](size_t i) const { return (*m_)[i]; }
ImmutableCPU();
return cpu_vec_.data<T>()[i];
}
// std::vector iterator methods. Based on CPU data access method // std::vector iterator methods. Based on CPU data access method
size_t size() const { return size_; } size_t size() const { return m_->size(); }
T *begin() { return capacity() == 0 ? &EmptyDummy() : &this->operator[](0); } iterator begin() { return m_->begin(); }
T *end() { iterator end() { return m_->end(); }
return capacity() == 0 ? &EmptyDummy() : &this->operator[](size());
}
T &front() { return *begin(); } T &front() { return m_->front(); }
T &back() { T &back() { return m_->back(); }
auto it = end();
--it;
return *it;
}
const T *begin() const { const_iterator begin() const { return m_->begin(); }
return capacity() == 0 ? &EmptyDummy() : &this->operator[](0);
}
const T *end() const { const_iterator end() const { return m_->end(); }
return capacity() == 0 ? &EmptyDummy() : &this->operator[](size());
}
const T *cbegin() const { return begin(); } const_iterator cbegin() const { return begin(); }
const T *cend() const { return end(); } const_iterator cend() const { return end(); }
const T &back() const { const T &back() const { return m_->back(); }
auto it = end();
--it;
return *it;
}
T *data() { return begin(); } T *data() { return m_->data(); }
const T *data() const { return begin(); } const T *data() const { return m_->data(); }
const T &front() const { return *begin(); } const T &front() const { return m_->front(); }
// end of std::vector iterator methods // end of std::vector iterator methods
// assign this from iterator. // assign this from iterator.
// NOTE: the iterator must support `end-begin` // NOTE: the iterator must support `end-begin`
template <typename Iter> template <typename Iter>
void assign(Iter begin, Iter end) { void assign(Iter begin, Iter end) {
InitByIter(end - begin, begin, end); m_->assign(begin, end);
} }
// push_back. If the previous capacity is not enough, the memory will // push_back. If the previous capacity is not enough, the memory will
// double. // double.
void push_back(T elem) { void push_back(T elem) { m_->push_back(elem); }
if (size_ + 1 > capacity()) {
reserve((size_ + 1) << 1);
}
*end() = elem;
++size_;
}
// extend a vector by iterator. // extend a vector by iterator.
// NOTE: the iterator must support end-begin // NOTE: the iterator must support end-begin
template <typename It> template <typename It>
void Extend(It begin, It end) { void Extend(It begin, It end) {
size_t pre_size = size_; m_->Extend(begin, end);
resize(pre_size + (end - begin));
T *ptr = this->begin() + pre_size;
for (; begin < end; ++begin, ++ptr) {
*ptr = *begin;
}
} }
// resize the vector // resize the vector
void resize(size_t size) { void resize(size_t size) { m_->resize(size); }
if (size + 1 <= capacity()) {
size_ = size;
} else {
MutableCPU();
Tensor cpu_tensor;
platform::Place cpu = platform::CPUPlace();
T *ptr = cpu_tensor.mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(size)}), cpu);
const T *old_ptr =
cpu_vec_.memory_size() == 0 ? nullptr : cpu_vec_.data<T>();
if (old_ptr != nullptr) {
std::copy(old_ptr, old_ptr + size_, ptr);
}
size_ = size;
cpu_vec_.ShareDataWith(cpu_tensor);
}
}
// get cuda ptr. immutable // get cuda ptr. immutable
const T *CUDAData(platform::Place place) const { const T *CUDAData(platform::Place place) const { return m_->CUDAData(place); }
PADDLE_ENFORCE(platform::is_gpu_place(place),
"CUDA Data must on CUDA place");
ImmutableCUDA(place);
return cuda_vec_.data<T>();
}
// get cuda ptr. mutable // get cuda ptr. mutable
T *CUDAMutableData(platform::Place place) { T *CUDAMutableData(platform::Place place) {
const T *ptr = CUDAData(place); return m_->CUDAMutableData(place);
flag_ = kDirty | kDataInCUDA;
return const_cast<T *>(ptr);
} }
// clear // clear
void clear() { void clear() { m_->clear(); }
size_ = 0;
flag_ = kDirty | kDataInCPU;
}
size_t capacity() const { size_t capacity() const { return m_->capacity(); }
return cpu_vec_.memory_size() / SizeOfType(typeid(T));
}
// reserve data // reserve data
void reserve(size_t size) { void reserve(size_t size) { m_->reserve(size); }
size_t pre_size = size_;
resize(size);
resize(pre_size);
}
// the unify method to access CPU or CUDA data. immutable. // the unify method to access CPU or CUDA data. immutable.
const T *Data(platform::Place place) const { const T *Data(platform::Place place) const {
...@@ -248,12 +436,7 @@ class Vector { ...@@ -248,12 +436,7 @@ class Vector {
} }
// implicit cast operator. Vector can be cast to std::vector implicitly. // implicit cast operator. Vector can be cast to std::vector implicitly.
operator std::vector<T>() const { operator std::vector<T>() const { return *m_; }
std::vector<T> result;
result.resize(size());
std::copy(begin(), end(), result.begin());
return result;
}
bool operator==(const Vector<T> &other) const { bool operator==(const Vector<T> &other) const {
if (size() != other.size()) return false; if (size() != other.size()) return false;
...@@ -268,117 +451,8 @@ class Vector { ...@@ -268,117 +451,8 @@ class Vector {
} }
private: private:
void InitEmpty() { // Vector is an COW object.
size_ = 0; details::COWPtr<VectorData> m_;
flag_ = kDataInCPU;
}
template <typename Iter>
void InitByIter(size_t size, Iter begin, Iter end) {
platform::Place cpu = platform::CPUPlace();
T *ptr = this->cpu_vec_.template mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(size)}), cpu);
for (size_t i = 0; i < size; ++i) {
*ptr++ = *begin++;
}
flag_ = kDataInCPU | kDirty;
size_ = size;
}
enum DataFlag {
kDataInCPU = 0x01,
kDataInCUDA = 0x02,
// kDirty means the data has been changed in one device.
kDirty = 0x10
};
void CopyToCPU() const {
// COPY GPU Data To CPU
TensorCopy(cuda_vec_, platform::CPUPlace(), &cpu_vec_);
WaitPlace(cuda_vec_.place());
}
void MutableCPU() {
if (IsInCUDA() && IsDirty()) {
CopyToCPU();
}
flag_ = kDirty | kDataInCPU;
}
void ImmutableCUDA(platform::Place place) const {
if (IsDirty()) {
if (IsInCPU()) {
TensorCopy(cpu_vec_, boost::get<platform::CUDAPlace>(place),
&cuda_vec_);
WaitPlace(place);
UnsetFlag(kDirty);
SetFlag(kDataInCUDA);
} else if (IsInCUDA() && !(place == cuda_vec_.place())) {
framework::Tensor tmp;
TensorCopy(cuda_vec_, boost::get<platform::CUDAPlace>(place), &tmp);
WaitPlace(cuda_vec_.place());
cuda_vec_.ShareDataWith(tmp);
// Still dirty
} else {
// Dirty && DataInCUDA && Device is same
// Do nothing
}
} else {
if (!IsInCUDA()) {
// Even data is not dirty. However, data is not in CUDA. Copy data.
TensorCopy(cpu_vec_, boost::get<platform::CUDAPlace>(place),
&cuda_vec_);
WaitPlace(place);
SetFlag(kDataInCUDA);
} else if (!(place == cuda_vec_.place())) {
framework::Tensor tmp;
WaitPlace(cuda_vec_.place());
TensorCopy(cuda_vec_, boost::get<platform::CUDAPlace>(place), &tmp);
WaitPlace(cuda_vec_.place());
WaitPlace(place);
cuda_vec_.ShareDataWith(tmp);
} else {
// Not Dirty && DataInCUDA && Device is same
// Do nothing.
}
}
}
void ImmutableCPU() const {
if (IsDirty() &&
!IsInCPU()) { // If data has been changed in CUDA, or CPU has no data.
CopyToCPU();
UnsetFlag(kDirty);
}
SetFlag(kDataInCPU);
}
void UnsetFlag(int flag) const { flag_ &= ~flag; }
void SetFlag(int flag) const { flag_ |= flag; }
bool IsDirty() const { return flag_ & kDirty; }
bool IsInCUDA() const { return flag_ & kDataInCUDA; }
bool IsInCPU() const { return flag_ & kDataInCPU; }
static void WaitPlace(const platform::Place place) {
if (platform::is_gpu_place(place)) {
platform::DeviceContextPool::Instance()
.Get(boost::get<platform::CUDAPlace>(place))
->Wait();
}
}
static T &EmptyDummy() {
static T dummy = T();
return dummy;
}
mutable int flag_;
mutable Tensor cpu_vec_;
mutable Tensor cuda_vec_;
size_t size_;
}; };
#else // PADDLE_WITH_CUDA #else // PADDLE_WITH_CUDA
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册