提交 ba8ba300 编写于 作者: D Dang Qingqing

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into quantize_transpiler_update

...@@ -20,41 +20,79 @@ namespace paddle { ...@@ -20,41 +20,79 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
template <class T> // Change it to thread safe flags if needed.
class COWPtr { class ThreadUnsafeOwnershipFlags {
public: public:
typedef std::shared_ptr<T> RefPtr; explicit ThreadUnsafeOwnershipFlags(bool flag) : flag_(flag) {}
private: ThreadUnsafeOwnershipFlags(const ThreadUnsafeOwnershipFlags& other) = delete;
RefPtr m_sp; ThreadUnsafeOwnershipFlags& operator=(
const ThreadUnsafeOwnershipFlags& other) = delete;
ThreadUnsafeOwnershipFlags(ThreadUnsafeOwnershipFlags&& other) = default;
void detach() { void SetOwnership(bool flag) { flag_ = flag; }
T* tmp = m_sp.get();
if (!(tmp == nullptr || m_sp.unique())) { // Invoke the callback if it is not owned.
m_sp = RefPtr(new T(*tmp)); template <typename Callback>
void AcquireOwnershipOnce(Callback acquire) {
if (!flag_) {
acquire();
flag_ = true;
} }
} }
public: private:
COWPtr() : m_sp(nullptr) {} bool flag_;
explicit COWPtr(T* t) : m_sp(t) {} };
explicit COWPtr(const RefPtr& refptr) : m_sp(refptr) {}
const T& Data() const { return operator*(); } // Copy-On-Write pointer.
// It will hold a T* pointer, and only copy once when `MutableData` is invoked.
//
// The template parameter OwnershipFlags should have:
// * a constructor takes a bool. True if own.
// * SetOwnership(bool flag).
// * AcquireOwnershipOnce(Callback). It will invoke the callback if it is not
// owned.
//
// https://en.wikipedia.org/wiki/Copy-on-write
template <typename T, typename OwnershipFlags = ThreadUnsafeOwnershipFlags>
class COWPtr {
public:
// Ctor from raw pointer.
explicit COWPtr(T* ptr) : payload_(ptr), ownership_{true} {}
T* MutableData() { return operator->(); } // Move methods. Steal ownership from origin
COWPtr(COWPtr&& other)
: payload_(other.payload_), ownership_{std::move(other.ownership_)} {}
COWPtr& operator=(COWPtr&& origin) = default;
const T& operator*() const { return *m_sp; } // Copy methods. Not own payload
T& operator*() { COWPtr(const COWPtr& other) : payload_(other.payload_), ownership_{false} {}
detach(); COWPtr& operator=(const COWPtr& other) {
return *m_sp; payload_ = other.payload_;
ownership_.SetOwnership(false);
return *this;
} }
const T* operator->() const { return m_sp.operator->(); }
T* operator->() { // Access read only data.
detach(); const T& Data() const { return *payload_; }
return m_sp.operator->();
// Access mutable data. If the data is not owned, the data will be copied
// before.
T* MutableData() {
ownership_.AcquireOwnershipOnce(
[this] { payload_.reset(new T(*payload_)); });
return payload_.get();
} }
private:
// Actual data pointer.
std::shared_ptr<T> payload_;
// Ownership flag.
OwnershipFlags ownership_;
}; };
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -30,14 +30,6 @@ TEST(COWPtr, all) { ...@@ -30,14 +30,6 @@ TEST(COWPtr, all) {
ASSERT_EQ(ptr2.Data(), 10); ASSERT_EQ(ptr2.Data(), 10);
} }
TEST(COWPtr, change_old) {
COWPtr<int> ptr(new int{0});
COWPtr<int> ptr2 = ptr;
*ptr.MutableData() = 10;
ASSERT_EQ(ptr2.Data(), 0);
ASSERT_EQ(ptr.Data(), 10);
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -210,43 +210,6 @@ std::vector<std::string> MultiDevSSAGraphBuilder::FindDistTrainRecvVars( ...@@ -210,43 +210,6 @@ std::vector<std::string> MultiDevSSAGraphBuilder::FindDistTrainRecvVars(
return recv_vars; return recv_vars;
} }
bool MultiDevSSAGraphBuilder::IsDistTrainOp(
ir::Node *node, const std::vector<std::string> &send_vars,
const std::vector<std::string> &recv_vars) const {
if (send_vars.size() == 0 || recv_vars.size() == 0) {
return false;
}
/**
* Check any of opvars contains `.block` and in sendvars
*/
auto checker = [](const std::vector<std::string> &opvars,
const std::vector<std::string> &rpc_vars) -> bool {
for (auto &var : opvars) {
// a variable name with the suffix `.block` means it's a splited
// variable by (DistributeTranspiler)
// [python/paddle/fluid/transpiler/distribute_transpiler.py]
if (var.find(".block") != std::string::npos &&
std::find(rpc_vars.begin(), rpc_vars.end(), var) != rpc_vars.end()) {
return true;
}
}
return false;
};
std::vector<std::string> input_var_names;
std::vector<std::string> output_var_names;
for (ir::Node *input : node->inputs) {
input_var_names.push_back(input->Name());
}
for (ir::Node *output : node->outputs) {
output_var_names.push_back(output->Name());
}
return checker(output_var_names, send_vars) ||
checker(input_var_names, recv_vars);
}
size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID(
const std::vector<std::string> &var_names) const { const std::vector<std::string> &var_names) const {
int64_t numel_sum = 0; int64_t numel_sum = 0;
...@@ -370,7 +333,9 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl( ...@@ -370,7 +333,9 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
} }
} }
is_dist_train = true; is_dist_train = true;
} else if (IsDistTrainOp(node, send_vars, recv_vars)) { } else if (boost::get<int>(node->Op()->GetAttr(
OpProtoAndCheckerMaker::OpRoleAttrName())) ==
static_cast<int>(OpRole::kDist)) {
int op_dev_id = CreateDistTrainOp(&result, node); int op_dev_id = CreateDistTrainOp(&result, node);
if (node->Op()->Type() == "concat") { if (node->Op()->Type() == "concat") {
auto origin_param_name = node->Op()->OutputArgumentNames()[0]; auto origin_param_name = node->Op()->OutputArgumentNames()[0];
...@@ -736,6 +701,7 @@ int MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ...@@ -736,6 +701,7 @@ int MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
.emplace(varname, op_dev_id); .emplace(varname, op_dev_id);
} }
} else { } else {
LOG(ERROR) << "got unexpected dist op: " << node->Op()->Type();
PADDLE_THROW( PADDLE_THROW(
"the distribute training related op should be in [split_byref, " "the distribute training related op should be in [split_byref, "
"concat]."); "concat].");
......
...@@ -51,12 +51,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass { ...@@ -51,12 +51,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass {
int CreateRPCOp(ir::Graph *result, ir::Node *node) const; int CreateRPCOp(ir::Graph *result, ir::Node *node) const;
int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const;
/**
* Is this operator as the end-point operator before/after send operator.
*/
bool IsDistTrainOp(ir::Node *node, const std::vector<std::string> &send_vars,
const std::vector<std::string> &recv_vars) const;
std::vector<std::string> FindDistTrainSendVars( std::vector<std::string> FindDistTrainSendVars(
const std::vector<ir::Node *> &nodes) const; const std::vector<ir::Node *> &nodes) const;
......
...@@ -17,12 +17,10 @@ ...@@ -17,12 +17,10 @@
#include <algorithm> #include <algorithm>
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
#include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/framework/details/cow_ptr.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/memory/memcpy.h"
#include "glog/logging.h" #include "glog/logging.h"
...@@ -30,165 +28,173 @@ namespace paddle { ...@@ -30,165 +28,173 @@ namespace paddle {
namespace framework { namespace framework {
#if defined(PADDLE_WITH_CUDA) #if defined(PADDLE_WITH_CUDA)
namespace details { // Vector<T> implements the std::vector interface, and can get Data or
struct CUDABuffer { // MutableData from any place. The data will be synced implicitly inside.
void *data_{nullptr}; template <typename T>
size_t size_{0}; class Vector {
platform::CUDAPlace place_; public:
using value_type = T;
CUDABuffer() {}
CUDABuffer(platform::Place place, size_t size)
: size_(size), place_(boost::get<platform::CUDAPlace>(place)) {
data_ = memory::Alloc(place_, size);
}
~CUDABuffer() { ClearMemory(); }
CUDABuffer(const CUDABuffer &o) = delete; // Default ctor. Create empty Vector
CUDABuffer &operator=(const CUDABuffer &o) = delete; Vector() { InitEmpty(); }
void Resize(platform::Place place, size_t size) { // Fill vector with value. The vector size is `count`.
ClearMemory(); explicit Vector(size_t count, const T &value = T()) {
place_ = boost::get<platform::CUDAPlace>(place); InitEmpty();
data_ = memory::Alloc(place_, size); if (count != 0) {
size_ = size; resize(count);
T *ptr = begin();
for (size_t i = 0; i < count; ++i) {
ptr[i] = value;
}
} }
void Swap(CUDABuffer &o) {
std::swap(data_, o.data_);
std::swap(place_, o.place_);
std::swap(size_, o.size_);
} }
private: // Ctor with init_list
void ClearMemory() const { Vector(std::initializer_list<T> init) {
if (data_) { if (init.size() == 0) {
memory::Free(place_, data_); InitEmpty();
} else {
InitByIter(init.size(), init.begin(), init.end());
} }
} }
};
} // namespace details
// Vector<T> implements the std::vector interface, and can get Data or
// MutableData from any place. The data will be synced implicitly inside.
template <typename T>
class Vector {
public:
using value_type = T;
using iterator = typename std::vector<T>::iterator;
using const_iterator = typename std::vector<T>::const_iterator;
private: // implicit cast from std::vector.
// The actual class to implement vector logic
class VectorData {
public:
VectorData() : flag_(kDataInCPU) {}
VectorData(size_t count, const T &value)
: cpu_(count, value), flag_(kDataInCPU) {}
VectorData(std::initializer_list<T> init) : cpu_(init), flag_(kDataInCPU) {}
template <typename U> template <typename U>
explicit VectorData(const std::vector<U> &dat) Vector(const std::vector<U> &dat) { // NOLINT
: cpu_(dat), flag_(kDataInCPU) {} if (dat.size() == 0) {
InitEmpty();
VectorData(const VectorData &o) { } else {
o.ImmutableCPU(); InitByIter(dat.size(), dat.begin(), dat.end());
cpu_ = o.cpu_; }
flag_ = kDataInCPU;
} }
VectorData &operator=(const VectorData &o) { // Copy ctor
o.ImmutableCPU(); Vector(const Vector<T> &other) { this->operator=(other); }
cpu_ = o.cpu_;
flag_ = kDataInCPU; // Copy operator
details::CUDABuffer null; Vector<T> &operator=(const Vector<T> &other) {
gpu_.Swap(null); if (other.size() != 0) {
this->InitByIter(other.size(), other.begin(), other.end());
} else {
InitEmpty();
}
return *this; return *this;
} }
// Move ctor
Vector(Vector<T> &&other) {
this->size_ = other.size_;
this->flag_ = other.flag_;
if (other.cuda_vec_.memory_size()) {
this->cuda_vec_.ShareDataWith(other.cuda_vec_);
}
if (other.cpu_vec_.memory_size()) {
this->cpu_vec_.ShareDataWith(other.cpu_vec_);
}
}
// CPU data access method. Mutable.
T &operator[](size_t i) { T &operator[](size_t i) {
MutableCPU(); MutableCPU();
return cpu_[i]; return const_cast<T *>(cpu_vec_.data<T>())[i];
} }
// CPU data access method. Immutable.
const T &operator[](size_t i) const { const T &operator[](size_t i) const {
ImmutableCPU(); ImmutableCPU();
return cpu_[i]; return cpu_vec_.data<T>()[i];
} }
size_t size() const { return cpu_.size(); } // std::vector iterator methods. Based on CPU data access method
size_t size() const { return size_; }
iterator begin() { T *begin() { return capacity() == 0 ? &EmptyDummy() : &this->operator[](0); }
MutableCPU();
return cpu_.begin();
}
iterator end() { T *end() {
MutableCPU(); return capacity() == 0 ? &EmptyDummy() : &this->operator[](size());
return cpu_.end();
} }
T &front() { T &front() { return *begin(); }
MutableCPU();
return cpu_.front();
}
T &back() { T &back() {
MutableCPU(); auto it = end();
return cpu_.back(); --it;
return *it;
} }
const_iterator begin() const { const T *begin() const {
ImmutableCPU(); return capacity() == 0 ? &EmptyDummy() : &this->operator[](0);
return cpu_.begin();
} }
const_iterator end() const { const T *end() const {
ImmutableCPU(); return capacity() == 0 ? &EmptyDummy() : &this->operator[](size());
return cpu_.end();
} }
const T *cbegin() const { return begin(); }
const T *cend() const { return end(); }
const T &back() const { const T &back() const {
ImmutableCPU(); auto it = end();
return cpu_.back(); --it;
return *it;
} }
T *data() { return &(*this)[0]; } T *data() { return begin(); }
const T *data() const { return &(*this)[0]; } const T *data() const { return begin(); }
const T &front() const { const T &front() const { return *begin(); }
ImmutableCPU(); // end of std::vector iterator methods
return cpu_.front();
}
// assign this from iterator. // assign this from iterator.
// NOTE: the iterator must support `end-begin` // NOTE: the iterator must support `end-begin`
template <typename Iter> template <typename Iter>
void assign(Iter begin, Iter end) { void assign(Iter begin, Iter end) {
MutableCPU(); InitByIter(end - begin, begin, end);
cpu_.assign(begin, end);
} }
// push_back. If the previous capacity is not enough, the memory will // push_back. If the previous capacity is not enough, the memory will
// double. // double.
void push_back(T elem) { void push_back(T elem) {
MutableCPU(); if (size_ + 1 > capacity()) {
cpu_.push_back(elem); reserve((size_ + 1) << 1);
}
*end() = elem;
++size_;
} }
// extend a vector by iterator. // extend a vector by iterator.
// NOTE: the iterator must support end-begin // NOTE: the iterator must support end-begin
template <typename It> template <typename It>
void Extend(It begin, It end) { void Extend(It begin, It end) {
MutableCPU(); size_t pre_size = size_;
auto out_it = std::back_inserter<std::vector<T>>(this->cpu_); resize(pre_size + (end - begin));
std::copy(begin, end, out_it); T *ptr = this->begin() + pre_size;
for (; begin < end; ++begin, ++ptr) {
*ptr = *begin;
}
} }
// resize the vector // resize the vector
void resize(size_t size) { void resize(size_t size) {
if (size + 1 <= capacity()) {
size_ = size;
} else {
MutableCPU(); MutableCPU();
cpu_.resize(size); Tensor cpu_tensor;
platform::Place cpu = platform::CPUPlace();
T *ptr = cpu_tensor.mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(size)}), cpu);
const T *old_ptr =
cpu_vec_.memory_size() == 0 ? nullptr : cpu_vec_.data<T>();
if (old_ptr != nullptr) {
std::copy(old_ptr, old_ptr + size_, ptr);
}
size_ = size;
cpu_vec_.ShareDataWith(cpu_tensor);
}
} }
// get cuda ptr. immutable // get cuda ptr. immutable
...@@ -196,7 +202,7 @@ class Vector { ...@@ -196,7 +202,7 @@ class Vector {
PADDLE_ENFORCE(platform::is_gpu_place(place), PADDLE_ENFORCE(platform::is_gpu_place(place),
"CUDA Data must on CUDA place"); "CUDA Data must on CUDA place");
ImmutableCUDA(place); ImmutableCUDA(place);
return reinterpret_cast<T *>(gpu_.data_); return cuda_vec_.data<T>();
} }
// get cuda ptr. mutable // get cuda ptr. mutable
...@@ -208,28 +214,77 @@ class Vector { ...@@ -208,28 +214,77 @@ class Vector {
// clear // clear
void clear() { void clear() {
cpu_.clear(); size_ = 0;
flag_ = kDirty | kDataInCPU; flag_ = kDirty | kDataInCPU;
} }
size_t capacity() const { return cpu_.capacity(); } size_t capacity() const {
return cpu_vec_.memory_size() / SizeOfType(typeid(T));
}
// reserve data // reserve data
void reserve(size_t size) { cpu_.reserve(size); } void reserve(size_t size) {
size_t pre_size = size_;
resize(size);
resize(pre_size);
}
// the unify method to access CPU or CUDA data. immutable.
const T *Data(platform::Place place) const {
if (platform::is_gpu_place(place)) {
return CUDAData(place);
} else {
return data();
}
}
// the unify method to access CPU or CUDA data. mutable.
T *MutableData(platform::Place place) {
if (platform::is_gpu_place(place)) {
return CUDAMutableData(place);
} else {
return data();
}
}
// implicit cast operator. Vector can be cast to std::vector implicitly. // implicit cast operator. Vector can be cast to std::vector implicitly.
operator std::vector<T>() const { operator std::vector<T>() const {
ImmutableCPU(); std::vector<T> result;
return cpu_; result.resize(size());
std::copy(begin(), end(), result.begin());
return result;
} }
bool operator==(const VectorData &other) const { bool operator==(const Vector<T> &other) const {
ImmutableCPU(); if (size() != other.size()) return false;
other.ImmutableCPU(); auto it1 = cbegin();
return cpu_ == other.cpu_; auto it2 = other.cbegin();
for (; it1 < cend(); ++it1, ++it2) {
if (*it1 != *it2) {
return false;
}
}
return true;
} }
private: private:
void InitEmpty() {
size_ = 0;
flag_ = kDataInCPU;
}
template <typename Iter>
void InitByIter(size_t size, Iter begin, Iter end) {
platform::Place cpu = platform::CPUPlace();
T *ptr = this->cpu_vec_.template mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(size)}), cpu);
for (size_t i = 0; i < size; ++i) {
*ptr++ = *begin++;
}
flag_ = kDataInCPU | kDirty;
size_ = size;
}
enum DataFlag { enum DataFlag {
kDataInCPU = 0x01, kDataInCPU = 0x01,
kDataInCUDA = 0x02, kDataInCUDA = 0x02,
...@@ -239,10 +294,8 @@ class Vector { ...@@ -239,10 +294,8 @@ class Vector {
void CopyToCPU() const { void CopyToCPU() const {
// COPY GPU Data To CPU // COPY GPU Data To CPU
void *src = gpu_.data_; TensorCopy(cuda_vec_, platform::CPUPlace(), &cpu_vec_);
void *dst = cpu_.data(); WaitPlace(cuda_vec_.place());
memory::Copy(platform::CPUPlace(), dst, gpu_.place_, src, gpu_.size_,
nullptr);
} }
void MutableCPU() { void MutableCPU() {
...@@ -255,12 +308,16 @@ class Vector { ...@@ -255,12 +308,16 @@ class Vector {
void ImmutableCUDA(platform::Place place) const { void ImmutableCUDA(platform::Place place) const {
if (IsDirty()) { if (IsDirty()) {
if (IsInCPU()) { if (IsInCPU()) {
CopyCPUDataToCUDA(place); TensorCopy(cpu_vec_, boost::get<platform::CUDAPlace>(place),
&cuda_vec_);
WaitPlace(place);
UnsetFlag(kDirty); UnsetFlag(kDirty);
SetFlag(kDataInCUDA); SetFlag(kDataInCUDA);
} else if (IsInCUDA() && } else if (IsInCUDA() && !(place == cuda_vec_.place())) {
!(boost::get<platform::CUDAPlace>(place) == gpu_.place_)) { framework::Tensor tmp;
CopyCUDADataToAnotherPlace(place); TensorCopy(cuda_vec_, boost::get<platform::CUDAPlace>(place), &tmp);
WaitPlace(cuda_vec_.place());
cuda_vec_.ShareDataWith(tmp);
// Still dirty // Still dirty
} else { } else {
// Dirty && DataInCUDA && Device is same // Dirty && DataInCUDA && Device is same
...@@ -269,38 +326,27 @@ class Vector { ...@@ -269,38 +326,27 @@ class Vector {
} else { } else {
if (!IsInCUDA()) { if (!IsInCUDA()) {
// Even data is not dirty. However, data is not in CUDA. Copy data. // Even data is not dirty. However, data is not in CUDA. Copy data.
CopyCPUDataToCUDA(place); TensorCopy(cpu_vec_, boost::get<platform::CUDAPlace>(place),
&cuda_vec_);
WaitPlace(place);
SetFlag(kDataInCUDA); SetFlag(kDataInCUDA);
} else if (!(boost::get<platform::CUDAPlace>(place) == gpu_.place_)) { } else if (!(place == cuda_vec_.place())) {
CopyCUDADataToAnotherPlace(place); framework::Tensor tmp;
WaitPlace(cuda_vec_.place());
TensorCopy(cuda_vec_, boost::get<platform::CUDAPlace>(place), &tmp);
WaitPlace(cuda_vec_.place());
WaitPlace(place);
cuda_vec_.ShareDataWith(tmp);
} else { } else {
// Not Dirty && DataInCUDA && Device is same // Not Dirty && DataInCUDA && Device is same
// Do nothing. // Do nothing.
} }
} }
} }
void CopyCUDADataToAnotherPlace(const platform::Place &place) const {
details::CUDABuffer tmp(place, gpu_.size_);
const void *src = gpu_.data_;
void *dst = tmp.data_;
memory::Copy(tmp.place_, dst, gpu_.place_, src, gpu_.size_, nullptr);
gpu_.Swap(tmp);
}
void CopyCPUDataToCUDA(const platform::Place &place) const {
void *src = cpu_.data();
gpu_.Resize(place, cpu_.size() * sizeof(T));
void *dst = gpu_.data_;
auto stream = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place))
->stream();
memory::Copy(gpu_.place_, dst, platform::CPUPlace(), src, gpu_.size_,
stream);
}
void ImmutableCPU() const { void ImmutableCPU() const {
if (IsDirty() && !IsInCPU()) { // If data has been changed in CUDA, or if (IsDirty() &&
// CPU has no data. !IsInCPU()) { // If data has been changed in CUDA, or CPU has no data.
CopyToCPU(); CopyToCPU();
UnsetFlag(kDirty); UnsetFlag(kDirty);
} }
...@@ -316,154 +362,23 @@ class Vector { ...@@ -316,154 +362,23 @@ class Vector {
bool IsInCPU() const { return flag_ & kDataInCPU; } bool IsInCPU() const { return flag_ & kDataInCPU; }
mutable std::vector<T> cpu_; static void WaitPlace(const platform::Place place) {
mutable details::CUDABuffer gpu_;
mutable int flag_;
};
public:
// Default ctor. Create empty Vector
Vector() : m_(new VectorData()) {}
// Fill vector with value. The vector size is `count`.
explicit Vector(size_t count, const T &value = T())
: m_(new VectorData(count, value)) {}
// Ctor with init_list
Vector(std::initializer_list<T> init) : m_(new VectorData(init)) {}
// implicit cast from std::vector.
template <typename U>
Vector(const std::vector<U> &dat) : m_(new VectorData(dat)) { // NOLINT
}
// Copy ctor
Vector(const Vector<T> &other) { m_ = other.m_; }
// Copy operator
Vector<T> &operator=(const Vector<T> &other) {
m_ = other.m_;
return *this;
}
// Move ctor
Vector(Vector<T> &&other) { m_ = std::move(other.m_); }
// CPU data access method. Mutable.
T &operator[](size_t i) { return (*m_)[i]; }
// CPU data access method. Immutable.
const T &operator[](size_t i) const { return (*m_)[i]; }
// std::vector iterator methods. Based on CPU data access method
size_t size() const { return m_->size(); }
iterator begin() { return m_->begin(); }
iterator end() { return m_->end(); }
T &front() { return m_->front(); }
T &back() { return m_->back(); }
const_iterator begin() const { return m_->begin(); }
const_iterator end() const { return m_->end(); }
const_iterator cbegin() const { return begin(); }
const_iterator cend() const { return end(); }
const T &back() const { return m_->back(); }
T *data() { return m_->data(); }
const T *data() const { return m_->data(); }
const T &front() const { return m_->front(); }
// end of std::vector iterator methods
// assign this from iterator.
// NOTE: the iterator must support `end-begin`
template <typename Iter>
void assign(Iter begin, Iter end) {
m_->assign(begin, end);
}
// push_back. If the previous capacity is not enough, the memory will
// double.
void push_back(T elem) { m_->push_back(elem); }
// extend a vector by iterator.
// NOTE: the iterator must support end-begin
template <typename It>
void Extend(It begin, It end) {
m_->Extend(begin, end);
}
// resize the vector
void resize(size_t size) {
if (m_.Data().size() != size) {
m_->resize(size);
}
}
// get cuda ptr. immutable
const T *CUDAData(platform::Place place) const {
return m_.Data().CUDAData(place);
}
// get cuda ptr. mutable
T *CUDAMutableData(platform::Place place) {
return m_->CUDAMutableData(place);
}
// clear
void clear() { m_->clear(); }
size_t capacity() const { return m_->capacity(); }
// reserve data
void reserve(size_t size) { m_->reserve(size); }
// the unify method to access CPU or CUDA data. immutable.
const T *Data(platform::Place place) const {
if (platform::is_gpu_place(place)) { if (platform::is_gpu_place(place)) {
return CUDAData(place); platform::DeviceContextPool::Instance()
} else { .Get(boost::get<platform::CUDAPlace>(place))
return data(); ->Wait();
} }
} }
// the unify method to access CPU or CUDA data. mutable. static T &EmptyDummy() {
T *MutableData(platform::Place place) { static T dummy = T();
if (platform::is_gpu_place(place)) { return dummy;
return CUDAMutableData(place);
} else {
return data();
}
} }
// implicit cast operator. Vector can be cast to std::vector implicitly. mutable int flag_;
operator std::vector<T>() const { return *m_; } mutable Tensor cpu_vec_;
mutable Tensor cuda_vec_;
bool operator==(const Vector<T> &other) const { size_t size_;
if (size() != other.size()) return false;
auto it1 = cbegin();
auto it2 = other.cbegin();
for (; it1 < cend(); ++it1, ++it2) {
if (*it1 != *it2) {
return false;
}
}
return true;
}
const void *Handle() const { return &m_.Data(); }
private:
// Vector is an COW object.
details::COWPtr<VectorData> m_;
}; };
#else // PADDLE_WITH_CUDA #else // PADDLE_WITH_CUDA
......
...@@ -120,6 +120,7 @@ void OpProtoAndCheckerMaker::operator()(proto::OpProto* proto, ...@@ -120,6 +120,7 @@ void OpProtoAndCheckerMaker::operator()(proto::OpProto* proto,
{static_cast<int>(OpRole::kForward), {static_cast<int>(OpRole::kForward),
static_cast<int>(OpRole::kBackward), static_cast<int>(OpRole::kBackward),
static_cast<int>(OpRole::kOptimize), static_cast<int>(OpRole::kRPC), static_cast<int>(OpRole::kOptimize), static_cast<int>(OpRole::kRPC),
static_cast<int>(OpRole::kDist), static_cast<int>(OpRole::kLRSched),
static_cast<int>(OpRole::kLoss) | static_cast<int>(OpRole::kForward), static_cast<int>(OpRole::kLoss) | static_cast<int>(OpRole::kForward),
static_cast<int>(OpRole::kLoss) | static_cast<int>(OpRole::kLoss) |
static_cast<int>(OpRole::kBackward), static_cast<int>(OpRole::kBackward),
......
...@@ -26,7 +26,13 @@ enum class OpRole { ...@@ -26,7 +26,13 @@ enum class OpRole {
kForward = 0x0000, kForward = 0x0000,
kBackward = 0x0001, kBackward = 0x0001,
kOptimize = 0x0002, kOptimize = 0x0002,
// RPC role is for send/recv releated op
kRPC = 0x0003, kRPC = 0x0003,
// Dist role is for split_byref/split_selected_rows/concat
// used for distributed training.
kDist = 0x0004,
// Tag all learning rate scheduler operators.
kLRSched = 0x0005,
kLoss = 0x0100, kLoss = 0x0100,
// The default value of op's role. This should be only used for unittests and // The default value of op's role. This should be only used for unittests and
......
...@@ -76,8 +76,8 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> { ...@@ -76,8 +76,8 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> {
auto ap_type = GetAPType(ctx.Attr<std::string>("ap_type")); auto ap_type = GetAPType(ctx.Attr<std::string>("ap_type"));
int class_num = ctx.Attr<int>("class_num"); int class_num = ctx.Attr<int>("class_num");
auto& label_lod = in_label->lod(); auto label_lod = in_label->lod();
auto& detect_lod = in_detect->lod(); auto detect_lod = in_detect->lod();
PADDLE_ENFORCE_EQ(label_lod.size(), 1UL, PADDLE_ENFORCE_EQ(label_lod.size(), 1UL,
"Only support one level sequence now."); "Only support one level sequence now.");
PADDLE_ENFORCE_EQ(label_lod[0].size(), detect_lod[0].size(), PADDLE_ENFORCE_EQ(label_lod[0].size(), detect_lod[0].size(),
...@@ -166,11 +166,11 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> { ...@@ -166,11 +166,11 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> {
auto labels = framework::EigenTensor<T, 2>::From(input_label); auto labels = framework::EigenTensor<T, 2>::From(input_label);
auto detect = framework::EigenTensor<T, 2>::From(input_detect); auto detect = framework::EigenTensor<T, 2>::From(input_detect);
auto& label_lod = input_label.lod(); auto label_lod = input_label.lod();
auto& detect_lod = input_detect.lod(); auto detect_lod = input_detect.lod();
int batch_size = label_lod[0].size() - 1; int batch_size = label_lod[0].size() - 1;
auto& label_index = label_lod[0]; auto label_index = label_lod[0];
for (int n = 0; n < batch_size; ++n) { for (int n = 0; n < batch_size; ++n) {
std::map<int, std::vector<Box>> boxes; std::map<int, std::vector<Box>> boxes;
...@@ -274,6 +274,7 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> { ...@@ -274,6 +274,7 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> {
output_true_pos->set_lod(true_pos_lod); output_true_pos->set_lod(true_pos_lod);
output_false_pos->set_lod(false_pos_lod); output_false_pos->set_lod(false_pos_lod);
return;
} }
void GetInputPos(const framework::Tensor& input_pos_count, void GetInputPos(const framework::Tensor& input_pos_count,
...@@ -291,7 +292,7 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> { ...@@ -291,7 +292,7 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> {
auto SetData = [](const framework::LoDTensor& pos_tensor, auto SetData = [](const framework::LoDTensor& pos_tensor,
std::map<int, std::vector<std::pair<T, int>>>& pos) { std::map<int, std::vector<std::pair<T, int>>>& pos) {
const T* pos_data = pos_tensor.data<T>(); const T* pos_data = pos_tensor.data<T>();
auto& pos_data_lod = pos_tensor.lod()[0]; auto pos_data_lod = pos_tensor.lod()[0];
for (size_t i = 0; i < pos_data_lod.size() - 1; ++i) { for (size_t i = 0; i < pos_data_lod.size() - 1; ++i) {
for (size_t j = pos_data_lod[i]; j < pos_data_lod[i + 1]; ++j) { for (size_t j = pos_data_lod[i]; j < pos_data_lod[i + 1]; ++j) {
T score = pos_data[j * 2]; T score = pos_data[j * 2];
...@@ -316,23 +317,20 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> { ...@@ -316,23 +317,20 @@ class DetectionMAPOpKernel : public framework::OpKernel<T> {
std::map<int, std::vector<std::pair<T, int>>>* false_pos) const { std::map<int, std::vector<std::pair<T, int>>>* false_pos) const {
int batch_size = gt_boxes.size(); int batch_size = gt_boxes.size();
for (int n = 0; n < batch_size; ++n) { for (int n = 0; n < batch_size; ++n) {
auto& image_gt_boxes = gt_boxes[n]; auto image_gt_boxes = gt_boxes[n];
for (auto& image_gt_box : image_gt_boxes) { for (auto it = image_gt_boxes.begin(); it != image_gt_boxes.end(); ++it) {
size_t count = 0; size_t count = 0;
auto& labeled_bboxes = image_gt_box.second; auto labeled_bboxes = it->second;
if (evaluate_difficult) { if (evaluate_difficult) {
count = labeled_bboxes.size(); count = labeled_bboxes.size();
} else { } else {
for (auto& box : labeled_bboxes) { for (size_t i = 0; i < labeled_bboxes.size(); ++i)
if (!box.is_difficult) { if (!(labeled_bboxes[i].is_difficult)) ++count;
++count;
}
}
} }
if (count == 0) { if (count == 0) {
continue; continue;
} }
int label = image_gt_box.first; int label = it->first;
if (label_pos_count->find(label) == label_pos_count->end()) { if (label_pos_count->find(label) == label_pos_count->end()) {
(*label_pos_count)[label] = count; (*label_pos_count)[label] = count;
} else { } else {
......
...@@ -92,9 +92,14 @@ bool VariableResponse::CopyLodTensorData( ...@@ -92,9 +92,14 @@ bool VariableResponse::CopyLodTensorData(
::google::protobuf::io::CodedInputStream* input, ::google::protobuf::io::CodedInputStream* input,
const platform::DeviceContext& ctx, const framework::DDim& dims, const platform::DeviceContext& ctx, const framework::DDim& dims,
int length) { int length) {
auto server_var = GetVar();
if (!server_var) {
LOG(ERROR) << "recved var should not on current server: "
<< meta_.varname();
return false;
}
auto* tensor = GetVar()->GetMutable<framework::LoDTensor>(); auto* tensor = GetVar()->GetMutable<framework::LoDTensor>();
tensor->Resize(dims); tensor->Resize(dims);
framework::LoD lod; framework::LoD lod;
for (int i = 0; i < meta_.lod_level(); ++i) { for (int i = 0; i < meta_.lod_level(); ++i) {
framework::Vector<size_t> v; framework::Vector<size_t> v;
...@@ -107,7 +112,6 @@ bool VariableResponse::CopyLodTensorData( ...@@ -107,7 +112,6 @@ bool VariableResponse::CopyLodTensorData(
void* tensor_data = void* tensor_data =
tensor->mutable_data(ctx.GetPlace(), ToTypeIndex(meta_.data_type())); tensor->mutable_data(ctx.GetPlace(), ToTypeIndex(meta_.data_type()));
if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) { if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) {
return false; return false;
} }
......
...@@ -50,7 +50,7 @@ class ExtractRowsOp : public framework::OperatorBase { ...@@ -50,7 +50,7 @@ class ExtractRowsOp : public framework::OperatorBase {
auto &in = scope.FindVar(Input("X"))->Get<framework::SelectedRows>(); auto &in = scope.FindVar(Input("X"))->Get<framework::SelectedRows>();
auto out = scope.FindVar(Output("Out"))->GetMutable<framework::LoDTensor>(); auto out = scope.FindVar(Output("Out"))->GetMutable<framework::LoDTensor>();
auto &in_rows = in.rows(); auto in_rows = in.rows();
auto out_dim = framework::make_ddim( auto out_dim = framework::make_ddim(
std::vector<int64_t>{static_cast<int64_t>(in_rows.size()), 1}); std::vector<int64_t>{static_cast<int64_t>(in_rows.size()), 1});
auto dst_ptr = out->mutable_data<int64_t>(out_dim, in.place()); auto dst_ptr = out->mutable_data<int64_t>(out_dim, in.place());
......
...@@ -60,9 +60,11 @@ struct SelectedRowsAdd<platform::CUDADeviceContext, T> { ...@@ -60,9 +60,11 @@ struct SelectedRowsAdd<platform::CUDADeviceContext, T> {
auto out_place = context.GetPlace(); auto out_place = context.GetPlace();
PADDLE_ENFORCE(platform::is_gpu_place(out_place)); PADDLE_ENFORCE(platform::is_gpu_place(out_place));
memory::Copy(boost::get<platform::CUDAPlace>(out_place), out_data, memory::Copy(
boost::get<platform::CUDAPlace>(out_place), out_data,
boost::get<platform::CUDAPlace>(in1_place), in1_data, boost::get<platform::CUDAPlace>(in1_place), in1_data,
in1_value.numel() * sizeof(T), context.stream()); in1_value.numel() * sizeof(T),
reinterpret_cast<const platform::CUDADeviceContext&>(context).stream());
auto* in2_data = in2_value.data<T>(); auto* in2_data = in2_value.data<T>();
memory::Copy(boost::get<platform::CUDAPlace>(out_place), memory::Copy(boost::get<platform::CUDAPlace>(out_place),
...@@ -107,7 +109,7 @@ struct SelectedRowsAddTensor<platform::CUDADeviceContext, T> { ...@@ -107,7 +109,7 @@ struct SelectedRowsAddTensor<platform::CUDADeviceContext, T> {
PADDLE_ENFORCE_EQ(in1_height, out_dims[0]); PADDLE_ENFORCE_EQ(in1_height, out_dims[0]);
auto& in1_value = input1.value(); auto& in1_value = input1.value();
framework::Vector<int64_t> in1_rows(input1.rows()); auto& in1_rows = input1.rows();
int64_t in1_row_numel = in1_value.numel() / in1_rows.size(); int64_t in1_row_numel = in1_value.numel() / in1_rows.size();
PADDLE_ENFORCE_EQ(in1_row_numel, input2.numel() / in1_height); PADDLE_ENFORCE_EQ(in1_row_numel, input2.numel() / in1_height);
...@@ -146,7 +148,7 @@ struct SelectedRowsAddTo<platform::CUDADeviceContext, T> { ...@@ -146,7 +148,7 @@ struct SelectedRowsAddTo<platform::CUDADeviceContext, T> {
auto in1_height = input1.height(); auto in1_height = input1.height();
PADDLE_ENFORCE_EQ(in1_height, input2->height()); PADDLE_ENFORCE_EQ(in1_height, input2->height());
auto& in1_rows = input1.rows(); framework::Vector<int64_t> in1_rows(input1.rows());
auto& in2_rows = *(input2->mutable_rows()); auto& in2_rows = *(input2->mutable_rows());
auto& in1_value = input1.value(); auto& in1_value = input1.value();
...@@ -206,7 +208,7 @@ struct SelectedRowsAddToTensor<platform::CUDADeviceContext, T> { ...@@ -206,7 +208,7 @@ struct SelectedRowsAddToTensor<platform::CUDADeviceContext, T> {
PADDLE_ENFORCE_EQ(in1_height, in2_dims[0]); PADDLE_ENFORCE_EQ(in1_height, in2_dims[0]);
auto& in1_value = input1.value(); auto& in1_value = input1.value();
framework::Vector<int64_t> in1_rows(input1.rows()); auto& in1_rows = input1.rows();
int64_t in1_row_numel = in1_value.numel() / in1_rows.size(); int64_t in1_row_numel = in1_value.numel() / in1_rows.size();
PADDLE_ENFORCE_EQ(in1_row_numel, input2->numel() / in1_height); PADDLE_ENFORCE_EQ(in1_row_numel, input2->numel() / in1_height);
......
...@@ -20,7 +20,9 @@ limitations under the License. */ ...@@ -20,7 +20,9 @@ limitations under the License. */
TEST(selected_rows_functor, gpu_add) { TEST(selected_rows_functor, gpu_add) {
paddle::platform::CUDAPlace gpu_place(0); paddle::platform::CUDAPlace gpu_place(0);
paddle::platform::CPUPlace cpu_place; paddle::platform::CPUPlace cpu_place;
paddle::platform::CUDADeviceContext ctx(gpu_place); paddle::platform::CUDADeviceContext& ctx =
*reinterpret_cast<paddle::platform::CUDADeviceContext*>(
paddle::platform::DeviceContextPool::Instance().Get(gpu_place));
paddle::operators::math::SetConstant<paddle::platform::CUDADeviceContext, paddle::operators::math::SetConstant<paddle::platform::CUDADeviceContext,
float> float>
functor; functor;
...@@ -132,7 +134,9 @@ TEST(selected_rows_functor, gpu_add) { ...@@ -132,7 +134,9 @@ TEST(selected_rows_functor, gpu_add) {
TEST(selected_rows_functor, gpu_add_to) { TEST(selected_rows_functor, gpu_add_to) {
paddle::platform::CUDAPlace gpu_place(0); paddle::platform::CUDAPlace gpu_place(0);
paddle::platform::CPUPlace cpu_place; paddle::platform::CPUPlace cpu_place;
paddle::platform::CUDADeviceContext ctx(gpu_place); paddle::platform::CUDADeviceContext& ctx =
*reinterpret_cast<paddle::platform::CUDADeviceContext*>(
paddle::platform::DeviceContextPool::Instance().Get(gpu_place));
paddle::operators::math::SetConstant<paddle::platform::CUDADeviceContext, paddle::operators::math::SetConstant<paddle::platform::CUDADeviceContext,
float> float>
functor; functor;
......
...@@ -123,6 +123,7 @@ class SumKernel : public framework::OpKernel<T> { ...@@ -123,6 +123,7 @@ class SumKernel : public framework::OpKernel<T> {
out_value->Resize(framework::make_ddim(in_dim)); out_value->Resize(framework::make_ddim(in_dim));
out_value->mutable_data<T>(context.GetPlace()); out_value->mutable_data<T>(context.GetPlace());
// if all the input sparse vars are empty, no need to // if all the input sparse vars are empty, no need to
// merge these vars. // merge these vars.
if (first_dim == 0UL) { if (first_dim == 0UL) {
......
...@@ -36,7 +36,9 @@ void BindConstValue(pybind11::module* m) { ...@@ -36,7 +36,9 @@ void BindConstValue(pybind11::module* m) {
.value("Backward", framework::OpRole::kBackward) .value("Backward", framework::OpRole::kBackward)
.value("Optimize", framework::OpRole::kOptimize) .value("Optimize", framework::OpRole::kOptimize)
.value("Loss", framework::OpRole::kLoss) .value("Loss", framework::OpRole::kLoss)
.value("RPC", framework::OpRole::kRPC); .value("RPC", framework::OpRole::kRPC)
.value("Dist", framework::OpRole::kDist)
.value("LRSched", framework::OpRole::kLRSched);
op_proto_and_checker_maker.def( op_proto_and_checker_maker.def(
"kOpRoleAttrName", framework::OpProtoAndCheckerMaker::OpRoleAttrName); "kOpRoleAttrName", framework::OpProtoAndCheckerMaker::OpRoleAttrName);
......
...@@ -1509,6 +1509,30 @@ class Program(object): ...@@ -1509,6 +1509,30 @@ class Program(object):
self._op_role_var = [] self._op_role_var = []
self._current_role = OpRole.Forward self._current_role = OpRole.Forward
@contextlib.contextmanager
def _lr_schedule_guard(self):
"""
A with guard to set :code:`LRSched` :code:`OpRole` and
:code:`OpRoleVar` automatically. The :code:`OpRoleVar` is
set to the target learning rate.
Notes: This is a very low level API. Users should not use it directly.
Examples:
>>> p, g = backward(...)
>>> with program.lr_schedule_guard():
>>> lr = lr * decay
"""
OpRole = core.op_proto_and_checker_maker.OpRole
self._current_role = OpRole.LRSched
# TODO(typhoonzero): how to set target learning rate var
self._op_role_var = []
yield
self._op_role_var = []
self._current_role = OpRole.Forward
def __str__(self): def __str__(self):
""" """
Get the protobuf debug string of this Program. Get the protobuf debug string of this Program.
......
...@@ -74,7 +74,7 @@ class Initializer(object): ...@@ -74,7 +74,7 @@ class Initializer(object):
directly, but need to use one of its implementations. directly, but need to use one of its implementations.
""" """
def __init_(self): def __init__(self):
pass pass
def __call__(self, param, block): def __call__(self, param, block):
...@@ -293,7 +293,7 @@ class TruncatedNormalInitializer(Initializer): ...@@ -293,7 +293,7 @@ class TruncatedNormalInitializer(Initializer):
assert loc is not None assert loc is not None
assert scale is not None assert scale is not None
assert seed is not None assert seed is not None
super(NormalInitializer, self).__init__() super(TruncatedNormalInitializer, self).__init__()
self._mean = loc self._mean = loc
self._std_dev = scale self._std_dev = scale
self._seed = seed self._seed = seed
......
...@@ -27,7 +27,7 @@ from . import nn ...@@ -27,7 +27,7 @@ from . import nn
from . import ops from . import ops
from . import tensor from . import tensor
from ..initializer import init_on_cpu from ..initializer import init_on_cpu
from ..framework import default_main_program, Parameter from ..framework import default_main_program, Parameter, unique_name
__all__ = [ __all__ = [
'exponential_decay', 'natural_exp_decay', 'inverse_time_decay', 'exponential_decay', 'natural_exp_decay', 'inverse_time_decay',
...@@ -63,6 +63,7 @@ def noam_decay(d_model, warmup_steps): ...@@ -63,6 +63,7 @@ def noam_decay(d_model, warmup_steps):
Returns: Returns:
The decayed learning rate. The decayed learning rate.
""" """
with default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter(1) global_step = _decay_step_counter(1)
a = global_step**-0.5 a = global_step**-0.5
...@@ -108,6 +109,7 @@ def exponential_decay(learning_rate, decay_steps, decay_rate, staircase=False): ...@@ -108,6 +109,7 @@ def exponential_decay(learning_rate, decay_steps, decay_rate, staircase=False):
sgd_optimizer.minimize(avg_cost) sgd_optimizer.minimize(avg_cost)
""" """
with default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter() global_step = _decay_step_counter()
div_res = global_step / decay_steps div_res = global_step / decay_steps
...@@ -136,6 +138,7 @@ def natural_exp_decay(learning_rate, decay_steps, decay_rate, staircase=False): ...@@ -136,6 +138,7 @@ def natural_exp_decay(learning_rate, decay_steps, decay_rate, staircase=False):
Returns: Returns:
The decayed learning rate The decayed learning rate
""" """
with default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter() global_step = _decay_step_counter()
div_res = global_step / decay_steps div_res = global_step / decay_steps
...@@ -181,6 +184,7 @@ def inverse_time_decay(learning_rate, decay_steps, decay_rate, staircase=False): ...@@ -181,6 +184,7 @@ def inverse_time_decay(learning_rate, decay_steps, decay_rate, staircase=False):
staircase=True)) staircase=True))
sgd_optimizer.minimize(avg_cost) sgd_optimizer.minimize(avg_cost)
""" """
with default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter() global_step = _decay_step_counter()
div_res = global_step / decay_steps div_res = global_step / decay_steps
...@@ -220,12 +224,15 @@ def polynomial_decay(learning_rate, ...@@ -220,12 +224,15 @@ def polynomial_decay(learning_rate,
Returns: Returns:
Variable: The decayed learning rate Variable: The decayed learning rate
""" """
with default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter() global_step = _decay_step_counter()
if cycle: if cycle:
div_res = ops.ceil(global_step / decay_steps) div_res = ops.ceil(global_step / decay_steps)
zero_var = tensor.fill_constant(shape=[1], dtype='float32', value=0.0) zero_var = tensor.fill_constant(
one_var = tensor.fill_constant(shape=[1], dtype='float32', value=1.0) shape=[1], dtype='float32', value=0.0)
one_var = tensor.fill_constant(
shape=[1], dtype='float32', value=1.0)
with control_flow.Switch() as switch: with control_flow.Switch() as switch:
with switch.case(global_step == zero_var): with switch.case(global_step == zero_var):
...@@ -266,7 +273,7 @@ def piecewise_decay(boundaries, values): ...@@ -266,7 +273,7 @@ def piecewise_decay(boundaries, values):
""" """
with default_main_program()._lr_schedule_guard():
if len(values) - len(boundaries) != 1: if len(values) - len(boundaries) != 1:
raise ValueError("len(values) - len(boundaries) should be 1") raise ValueError("len(values) - len(boundaries) should be 1")
...@@ -291,7 +298,9 @@ def piecewise_decay(boundaries, values): ...@@ -291,7 +298,9 @@ def piecewise_decay(boundaries, values):
with switch.case(global_step < boundary_val): with switch.case(global_step < boundary_val):
tensor.assign(value_var, lr) tensor.assign(value_var, lr)
last_value_var = tensor.fill_constant( last_value_var = tensor.fill_constant(
shape=[1], dtype='float32', value=float(values[len(values) - 1])) shape=[1],
dtype='float32',
value=float(values[len(values) - 1]))
with switch.default(): with switch.default():
tensor.assign(last_value_var, lr) tensor.assign(last_value_var, lr)
......
...@@ -80,7 +80,8 @@ if(WITH_DISTRIBUTE) ...@@ -80,7 +80,8 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext SERIAL) py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext SERIAL)
endif(NOT APPLE) endif(NOT APPLE)
py_test_modules(test_dist_transpiler MODULES test_dist_transpiler) py_test_modules(test_dist_transpiler MODULES test_dist_transpiler)
py_test_modules(test_dist_transformer MODULES test_dist_transformer SERIAL) #FIXME(gongwb): random fails.
#py_test_modules(test_dist_transformer MODULES test_dist_transformer SERIAL)
endif() endif()
py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL) py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL)
py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL) py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL)
......
...@@ -345,7 +345,7 @@ class OpTest(unittest.TestCase): ...@@ -345,7 +345,7 @@ class OpTest(unittest.TestCase):
actual_t, expect_t, atol=atol, equal_nan=equal_nan), actual_t, expect_t, atol=atol, equal_nan=equal_nan),
"Output (" + out_name + ") has diff at " + str(place) + "Output (" + out_name + ") has diff at " + str(place) +
"\nExpect " + str(expect_t) + "\n" + "But Got" + "\nExpect " + str(expect_t) + "\n" + "But Got" +
str(actual_t) + " in class " + self.__class__.__name__) str(actual_t))
if isinstance(expect, tuple): if isinstance(expect, tuple):
self.assertListEqual(actual.recursive_sequence_lengths(), self.assertListEqual(actual.recursive_sequence_lengths(),
expect[1], "Output (" + out_name + expect[1], "Output (" + out_name +
......
...@@ -20,7 +20,6 @@ import six ...@@ -20,7 +20,6 @@ import six
import sys import sys
import collections import collections
import math import math
import paddle.fluid as fluid
from op_test import OpTest from op_test import OpTest
...@@ -33,7 +32,7 @@ class TestDetectionMAPOp(OpTest): ...@@ -33,7 +32,7 @@ class TestDetectionMAPOp(OpTest):
self.detect = np.array(self.detect).astype('float32') self.detect = np.array(self.detect).astype('float32')
self.mAP = np.array(self.mAP).astype('float32') self.mAP = np.array(self.mAP).astype('float32')
if len(self.class_pos_count) > 0: if (len(self.class_pos_count) > 0):
self.class_pos_count = np.array(self.class_pos_count).astype( self.class_pos_count = np.array(self.class_pos_count).astype(
'int32') 'int32')
self.true_pos = np.array(self.true_pos).astype('float32') self.true_pos = np.array(self.true_pos).astype('float32')
...@@ -274,7 +273,7 @@ class TestDetectionMAPOp11Point(TestDetectionMAPOp): ...@@ -274,7 +273,7 @@ class TestDetectionMAPOp11Point(TestDetectionMAPOp):
class TestDetectionMAPOpMultiBatch(TestDetectionMAPOp): class TestDetectionMAPOpMultiBatch(TestDetectionMAPOp):
def init_test_case(self): def init_test_case(self):
super(TestDetectionMAPOpMultiBatch, self).init_test_case() super(TestDetectionMAPOpMultiBatch, self).init_test_case()
self.class_pos_count = [0, 2, 1, 0] self.class_pos_count = [0, 2, 1]
self.true_pos_lod = [[0, 3, 2]] self.true_pos_lod = [[0, 3, 2]]
self.true_pos = [[0.7, 1.], [0.3, 0.], [0.2, 1.], [0.8, 0.], [0.1, 1.]] self.true_pos = [[0.7, 1.], [0.3, 0.], [0.2, 1.], [0.8, 0.], [0.1, 1.]]
self.false_pos_lod = [[0, 3, 2]] self.false_pos_lod = [[0, 3, 2]]
......
...@@ -22,7 +22,7 @@ class TestDistMnist2x2(TestDistBase): ...@@ -22,7 +22,7 @@ class TestDistMnist2x2(TestDistBase):
self._sync_mode = True self._sync_mode = True
self._use_reduce = False self._use_reduce = False
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_mnist.py", delta=1e-7) self.check_with_place("dist_mnist.py", delta=1e-7)
...@@ -31,7 +31,7 @@ class TestDistMnist2x2WithMemopt(TestDistBase): ...@@ -31,7 +31,7 @@ class TestDistMnist2x2WithMemopt(TestDistBase):
self._sync_mode = True self._sync_mode = True
self._mem_opt = True self._mem_opt = True
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_mnist.py", delta=1e-7) self.check_with_place("dist_mnist.py", delta=1e-7)
...@@ -40,7 +40,7 @@ class TestDistMnistAsync(TestDistBase): ...@@ -40,7 +40,7 @@ class TestDistMnistAsync(TestDistBase):
self._sync_mode = False self._sync_mode = False
self._use_reduce = False self._use_reduce = False
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_mnist.py", delta=200) self.check_with_place("dist_mnist.py", delta=200)
......
...@@ -21,7 +21,16 @@ class TestDistSeResneXt2x2(TestDistBase): ...@@ -21,7 +21,16 @@ class TestDistSeResneXt2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=1e-7)
class TestDistseResnXt2x2WithMemopt(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._mem_opt = True
def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=1e-7) self.check_with_place("dist_se_resnext.py", delta=1e-7)
...@@ -29,7 +38,7 @@ class TestDistSeResneXt2x2Async(TestDistBase): ...@@ -29,7 +38,7 @@ class TestDistSeResneXt2x2Async(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=100) self.check_with_place("dist_se_resnext.py", delta=100)
......
...@@ -59,7 +59,7 @@ class TestDistTransformer2x2Sync(TestDistBase): ...@@ -59,7 +59,7 @@ class TestDistTransformer2x2Sync(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
def test_transformer(self): def test_dist_train(self):
download_files() download_files()
self.check_with_place("dist_transformer.py", delta=1e-5) self.check_with_place("dist_transformer.py", delta=1e-5)
...@@ -68,7 +68,7 @@ class TestDistTransformer2x2Async(TestDistBase): ...@@ -68,7 +68,7 @@ class TestDistTransformer2x2Async(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
def test_transformer(self): def test_dist_train(self):
download_files() download_files()
self.check_with_place("dist_transformer.py", delta=1.0) self.check_with_place("dist_transformer.py", delta=1.0)
......
...@@ -17,19 +17,28 @@ import unittest ...@@ -17,19 +17,28 @@ import unittest
from test_dist_base import TestDistBase from test_dist_base import TestDistBase
class TestDistSeResneXt2x2(TestDistBase): class TestDistW2V2x2(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_word2vec.py", delta=1e-4) self.check_with_place("dist_word2vec.py", delta=1e-4)
class TestDistSeResneXt2x2Async(TestDistBase): class TestDistW2V2x2WithMemOpt(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._mem_opt = True
def test_dist_train(self):
self.check_with_place("dist_word2vec.py", delta=1e-4)
class TestDistW2V2x2Async(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = False self._sync_mode = False
def test_se_resnext(self): def test_dist_train(self):
self.check_with_place("dist_word2vec.py", delta=1) self.check_with_place("dist_word2vec.py", delta=1)
......
...@@ -21,13 +21,12 @@ import paddle ...@@ -21,13 +21,12 @@ import paddle
def delete_ops(block, ops): def delete_ops(block, ops):
for op in ops:
try: try:
start = list(block.ops).index(ops[0]) idx = list(block.ops).index(op)
end = list(block.ops).index(ops[-1]) block._remove_op(idx)
[block._remove_op(start) for _ in six.moves.range(end - start + 1)]
except Exception as e: except Exception as e:
raise e print(e)
block.program._sync_with_cpp()
def find_op_by_input_arg(block, arg_name): def find_op_by_input_arg(block, arg_name):
...@@ -37,7 +36,15 @@ def find_op_by_input_arg(block, arg_name): ...@@ -37,7 +36,15 @@ def find_op_by_input_arg(block, arg_name):
return -1 return -1
def find_op_by_output_arg(block, arg_name): def find_op_by_output_arg(block, arg_name, reverse=False):
if reverse:
pos = len(block.ops) - 1
while pos >= 0:
op = block.ops[pos]
if arg_name in op.output_arg_names:
return pos
pos -= 1
else:
for index, op in enumerate(block.ops): for index, op in enumerate(block.ops):
if arg_name in op.output_arg_names: if arg_name in op.output_arg_names:
return index return index
......
...@@ -50,6 +50,15 @@ OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() ...@@ -50,6 +50,15 @@ OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
RPC_OP_ROLE_ATTR_NAME = op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName( RPC_OP_ROLE_ATTR_NAME = op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName(
) )
RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC
DIST_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Dist
LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched
PRINT_LOG = False
def log(*args):
if PRINT_LOG:
print(args)
class VarBlock: class VarBlock:
...@@ -127,6 +136,7 @@ class DistributeTranspilerConfig(object): ...@@ -127,6 +136,7 @@ class DistributeTranspilerConfig(object):
slice_var_up = True slice_var_up = True
split_method = None split_method = None
min_block_size = 8192 min_block_size = 8192
print_log = False
class DistributeTranspiler(object): class DistributeTranspiler(object):
...@@ -174,6 +184,9 @@ class DistributeTranspiler(object): ...@@ -174,6 +184,9 @@ class DistributeTranspiler(object):
if self.config.split_method is None: if self.config.split_method is None:
self.config.split_method = RoundRobin self.config.split_method = RoundRobin
global PRINT_LOG
if self.config.print_log:
PRINT_LOG = True
assert (self.config.min_block_size >= 8192) assert (self.config.min_block_size >= 8192)
assert (self.config.split_method.__bases__[0] == PSDispatcher) assert (self.config.split_method.__bases__[0] == PSDispatcher)
...@@ -257,12 +270,12 @@ class DistributeTranspiler(object): ...@@ -257,12 +270,12 @@ class DistributeTranspiler(object):
splited_grad_varname = grad_varname splited_grad_varname = grad_varname
if len(splited_vars) == 1: if len(splited_vars) == 1:
splited_grad_varname = splited_vars[0].name splited_grad_varname = splited_vars[0].name
index = find_op_by_output_arg(program.global_block(), index = find_op_by_output_arg(
splited_grad_varname) program.global_block(), splited_grad_varname, reverse=True)
elif len(splited_vars) > 1: elif len(splited_vars) > 1:
orig_var = program.global_block().vars[splited_grad_varname] orig_var = program.global_block().vars[splited_grad_varname]
index = find_op_by_output_arg(program.global_block(), index = find_op_by_output_arg(
splited_grad_varname) program.global_block(), splited_grad_varname, reverse=True)
self._insert_split_op(program, orig_var, index, splited_vars) self._insert_split_op(program, orig_var, index, splited_vars)
index += 1 index += 1
else: else:
...@@ -301,7 +314,7 @@ class DistributeTranspiler(object): ...@@ -301,7 +314,7 @@ class DistributeTranspiler(object):
self.grad_name_to_send_dummy_out[ self.grad_name_to_send_dummy_out[
self.table_name] = program.global_block().create_var( self.table_name] = program.global_block().create_var(
name=framework.generate_control_dev_var_name()) name=framework.generate_control_dev_var_name())
input_deps = self.grad_name_to_send_dummy_out.values() input_deps = list(self.grad_name_to_send_dummy_out.values())
program.global_block().append_op( program.global_block().append_op(
type="send_barrier", type="send_barrier",
...@@ -377,7 +390,10 @@ class DistributeTranspiler(object): ...@@ -377,7 +390,10 @@ class DistributeTranspiler(object):
type="concat", type="concat",
inputs={"X": splited_var}, inputs={"X": splited_var},
outputs={"Out": [orig_param]}, outputs={"Out": [orig_param]},
attrs={"axis": 0}) attrs={
"axis": 0,
RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE
})
self._get_trainer_startup_program(recv_vars=recv_vars, eplist=eplist) self._get_trainer_startup_program(recv_vars=recv_vars, eplist=eplist)
...@@ -496,9 +512,9 @@ class DistributeTranspiler(object): ...@@ -496,9 +512,9 @@ class DistributeTranspiler(object):
# NOTE: assume blocks of the same variable is not distributed # NOTE: assume blocks of the same variable is not distributed
# on the same pserver, only change param/grad varnames for # on the same pserver, only change param/grad varnames for
# trainers to fetch. # trainers to fetch.
sys.stderr.write("get_pserver_program() is deprecated, call\ sys.stderr.write("get_pserver_program() is deprecated, call \
get_pserver_programs() to get pserver main and startup\ get_pserver_programs() to get pserver main and startup \
in a single call.") in a single call.")
# step1 # step1
pserver_program = Program() pserver_program = Program()
pserver_program.random_seed = self.origin_program.random_seed pserver_program.random_seed = self.origin_program.random_seed
...@@ -615,22 +631,31 @@ class DistributeTranspiler(object): ...@@ -615,22 +631,31 @@ class DistributeTranspiler(object):
for idx, opt_op in enumerate(opt_op_on_pserver): for idx, opt_op in enumerate(opt_op_on_pserver):
per_opt_block = pserver_program._create_block(pre_block_idx) per_opt_block = pserver_program._create_block(pre_block_idx)
optimize_blocks.append(per_opt_block) optimize_blocks.append(per_opt_block)
optimize_target_param_name = opt_op.attr(OP_ROLE_VAR_ATTR_NAME)[0]
# append grad merging ops before clip and weight decay # append grad merging ops before clip and weight decay
# cases may like: # e.g. merge grad -> L2Decay op -> clip op -> optimize
# L2Decay op -> clip op -> optimize merged_var = None
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# find the origin @GRAD var before clipping # find the origin grad var before clipping/L2Decay,
grad_varname_for_block = __op_have_grad_input__(op) # merged_var should be the input var name of L2Decaybuil
if ufind.is_connected(op, opt_op) and grad_varname_for_block: grad_varname_for_block = op.attr(OP_ROLE_VAR_ATTR_NAME)[1]
if op.attr(OP_ROLE_VAR_ATTR_NAME)[
0] == optimize_target_param_name:
merged_var = self._append_pserver_grad_merge_ops( merged_var = self._append_pserver_grad_merge_ops(
per_opt_block, grad_varname_for_block, endpoint, per_opt_block, grad_varname_for_block, endpoint,
grad_to_block_id, self.origin_program) grad_to_block_id, self.origin_program)
if merged_var:
break # append optimize op once then append other ops. break # append optimize op once then append other ops.
if merged_var:
for _, op in enumerate(self.optimize_ops): for _, op in enumerate(self.optimize_ops):
# optimizer is connected to itself # optimizer is connected to itself
if ufind.is_connected(op, opt_op) and op not in global_ops: if op.attr(OP_ROLE_VAR_ATTR_NAME)[0] == optimize_target_param_name and \
__append_optimize_op__(op, per_opt_block, grad_to_block_id, op not in global_ops:
merged_var, lr_ops) log("append opt op: ", op.type, op.input_arg_names,
merged_var)
__append_optimize_op__(op, per_opt_block,
grad_to_block_id, merged_var,
lr_ops)
# dedup grad to ids list # dedup grad to ids list
grad_to_block_id = list(set(grad_to_block_id)) grad_to_block_id = list(set(grad_to_block_id))
...@@ -726,17 +751,17 @@ class DistributeTranspiler(object): ...@@ -726,17 +751,17 @@ class DistributeTranspiler(object):
Returns: Returns:
Program: parameter server side startup program. Program: parameter server side startup program.
""" """
sys.stderr.write("get_startup_program() is deprecated, call\ sys.stderr.write("get_startup_program() is deprecated, call \
get_pserver_programs() to get pserver main and startup\ get_pserver_programs() to get pserver main and startup \
in a single call.") in a single call.")
if pserver_program != None: if pserver_program != None:
sys.stderr.write("passing pserver_program to get_startup_program()\ sys.stderr.write("passing pserver_program to get_startup_program() \
is deprecated, you can use new API get_pserver_programs() to\ is deprecated, you can use new API get_pserver_programs() to \
get both pserver main program and startup program.") get both pserver main program and startup program.")
if startup_program != None: if startup_program != None:
sys.stderr.write("passing startup_program to get_startup_program()\ sys.stderr.write("passing startup_program to get_startup_program() \
is deprecated, use fluid.program_guard() or pass this argument\ is deprecated, use fluid.program_guard() or pass this argument \
to transpile() call.") to transpile() call.")
s_prog = Program() s_prog = Program()
orig_s_prog = self.startup_program orig_s_prog = self.startup_program
...@@ -1302,7 +1327,10 @@ class DistributeTranspiler(object): ...@@ -1302,7 +1327,10 @@ class DistributeTranspiler(object):
type="split_selected_rows", type="split_selected_rows",
inputs={"X": orig_var}, inputs={"X": orig_var},
outputs={"Out": splited_vars}, outputs={"Out": splited_vars},
attrs={"height_sections": height_sections}) attrs={
"height_sections": height_sections,
RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE
})
elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR: elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR:
sections = [] sections = []
for v in splited_vars: for v in splited_vars:
...@@ -1312,8 +1340,10 @@ class DistributeTranspiler(object): ...@@ -1312,8 +1340,10 @@ class DistributeTranspiler(object):
type="split_byref", type="split_byref",
inputs={"X": orig_var}, inputs={"X": orig_var},
outputs={"Out": splited_vars}, outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly attrs={
) "sections": sections,
RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE
})
else: else:
AssertionError("Variable type should be in set " AssertionError("Variable type should be in set "
"[LOD_TENSOR, SELECTED_ROWS]") "[LOD_TENSOR, SELECTED_ROWS]")
...@@ -1381,15 +1411,15 @@ class DistributeTranspiler(object): ...@@ -1381,15 +1411,15 @@ class DistributeTranspiler(object):
if not grad_block: if not grad_block:
# do not append this op if current endpoint # do not append this op if current endpoint
# is not dealing with this grad block # is not dealing with this grad block
return return None
orig_varname, block_name, trainer_name = self._get_varname_parts( orig_varname, block_name, trainer_name = self._get_varname_parts(
grad_block.name) grad_block.name)
if block_name: if block_name:
merged_var_name = '.'.join([orig_varname, block_name]) merged_var_name = '.'.join([orig_varname, block_name])
else: else:
merged_var_name = orig_varname merged_var_name = orig_varname
merged_var = \
pserver_block.vars[merged_var_name] merged_var = pserver_block.vars[merged_var_name]
grad_to_block_id.append(merged_var.name + ":" + str(optimize_block.idx)) grad_to_block_id.append(merged_var.name + ":" + str(optimize_block.idx))
if self.sync_mode and self.trainer_num > 1: if self.sync_mode and self.trainer_num > 1:
vars2merge = [] vars2merge = []
...@@ -1473,7 +1503,6 @@ class DistributeTranspiler(object): ...@@ -1473,7 +1503,6 @@ class DistributeTranspiler(object):
outputs = self._get_output_map_from_op( outputs = self._get_output_map_from_op(
self.origin_program.global_block().vars, opt_op) self.origin_program.global_block().vars, opt_op)
outputs["ParamOut"] = new_inputs["Param"] outputs["ParamOut"] = new_inputs["Param"]
optimize_block.append_op( optimize_block.append_op(
type=opt_op.type, type=opt_op.type,
inputs=new_inputs, inputs=new_inputs,
...@@ -1618,6 +1647,16 @@ class DistributeTranspiler(object): ...@@ -1618,6 +1647,16 @@ class DistributeTranspiler(object):
return iomap return iomap
def _get_lr_ops(self): def _get_lr_ops(self):
lr_ops = []
block = self.origin_program.global_block()
for op in block.ops:
if int(op.attr(RPC_OP_ROLE_ATTR_NAME)) == int(
LR_SCHED_OP_ROLE_ATTR_VALUE):
lr_ops.append(op)
log("append lr op: ", op.type)
return lr_ops
def _get_lr_ops_deprecated(self):
lr_ops = [] lr_ops = []
# find learning rate variables by optimize op # find learning rate variables by optimize op
lr_vars = set() lr_vars = set()
...@@ -1670,20 +1709,21 @@ class DistributeTranspiler(object): ...@@ -1670,20 +1709,21 @@ class DistributeTranspiler(object):
block = self.origin_program.global_block() block = self.origin_program.global_block()
opt_ops = [] opt_ops = []
params_grads = [] params_grads = []
# tmp set to dedup
optimize_params = set()
origin_var_dict = self.origin_program.global_block().vars origin_var_dict = self.origin_program.global_block().vars
for op in block.ops: for op in block.ops:
if self._is_opt_role_op(op): if self._is_opt_role_op(op):
opt_ops.append(op) opt_ops.append(op)
# HACK(wuyi): if we find grad vars from input of optimize if op.attr(OP_ROLE_VAR_ATTR_NAME):
# ops, we may get the output of clip op. Use syntax "@GRAD"
# and op_role_var to get the pair.
for input_name in op.input_arg_names:
if input_name.find("@GRAD") != -1 and \
op.attr(RPC_OP_ROLE_ATTR_NAME):
param_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[0] param_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[0]
grad_name = op.attr(OP_ROLE_VAR_ATTR_NAME)[1]
if not param_name in optimize_params:
optimize_params.add(param_name)
log("adding param_grad pair: ", param_name, grad_name)
params_grads.append([ params_grads.append([
origin_var_dict[param_name], origin_var_dict[param_name],
origin_var_dict[input_name] origin_var_dict[grad_name]
]) ])
else: else:
pass pass
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
from __future__ import print_function from __future__ import print_function
from collections import defaultdict from collections import defaultdict, OrderedDict, Callable
from .. import core from .. import core
from ... import compat as cpt from ... import compat as cpt
from ..framework import Program, default_main_program, Parameter from ..framework import Program, default_main_program, Parameter, Variable
from ..backward import _rename_arg_ from ..backward import _rename_arg_
from functools import reduce from functools import reduce
from six.moves import range from six.moves import range
...@@ -113,8 +113,10 @@ class ControlFlowGraph(object): ...@@ -113,8 +113,10 @@ class ControlFlowGraph(object):
def _fill_pool(self, i, is_forward): def _fill_pool(self, i, is_forward):
block_desc = self._ops[i].block() block_desc = self._ops[i].block()
in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i]) in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i])
# NOTE: must sort the in_diff set for cases that get different cache var.
# FIXME(typhoonzero): maybe use a "sorted set" is better than this.
can_optimize = [ can_optimize = [
x for x in in_diff x for x in sorted(list(in_diff))
if self._check_var_validity(block_desc, x, is_forward) if self._check_var_validity(block_desc, x, is_forward)
] ]
if can_optimize: if can_optimize:
...@@ -220,8 +222,9 @@ class ControlFlowGraph(object): ...@@ -220,8 +222,9 @@ class ControlFlowGraph(object):
block_desc = op.block() block_desc = op.block()
is_forward = i < self._forward_num is_forward = i < self._forward_num
if self.pool: if self.pool:
# NOTE: must sort the in_diff set for cases that get different cache var.
defs_can_optimize = [ defs_can_optimize = [
x for x in self._defs[i] x for x in sorted(list(self._defs[i]))
if self._check_var_validity(block_desc, x, is_forward) if self._check_var_validity(block_desc, x, is_forward)
] ]
out_pair = [ out_pair = [
...@@ -271,6 +274,8 @@ class ControlFlowGraph(object): ...@@ -271,6 +274,8 @@ class ControlFlowGraph(object):
self._program.block(block_desc.id).var(cpt.to_text( self._program.block(block_desc.id).var(cpt.to_text(
x)).desc = self._find_var(block_desc, cache_var, x)).desc = self._find_var(block_desc, cache_var,
is_forward) is_forward)
self._program.block(block_desc.id).vars[cpt.to_text(x)] = \
Variable(self._program.block(block_desc.id), name=cpt.to_text(x))
self._update_graph(x, cache_var, begin_idx=i) self._update_graph(x, cache_var, begin_idx=i)
break break
self._fill_pool(i, is_forward) self._fill_pool(i, is_forward)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册