提交 48031c41 编写于 作者: W wangguibao

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into async_executor

...@@ -34,7 +34,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, ...@@ -34,7 +34,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
nccl_ctxs_(ctxs) { nccl_ctxs_(ctxs) {
if (nccl_ctxs_) { if (nccl_ctxs_) {
for (auto &p : places_) { for (auto &p : places_) {
this->dev_ctxes_[p] = nccl_ctxs_->DevCtx(p); this->SetDeviceContext(p, nccl_ctxs_->DevCtx(p));
} }
} }
} }
...@@ -46,7 +46,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, ...@@ -46,7 +46,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif #endif
void AllReduceOpHandle::RunImpl() { void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second); platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
if (NoDummyInputSize() == 1) { if (NoDummyInputSize() == 1) {
return; // No need to all reduce when GPU count = 1; return; // No need to all reduce when GPU count = 1;
...@@ -127,7 +127,7 @@ void AllReduceOpHandle::RunImpl() { ...@@ -127,7 +127,7 @@ void AllReduceOpHandle::RunImpl() {
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>(); *local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &p = places_[i]; auto &p = places_[i];
auto *var = scope.FindVar(out_var_handles[i]->name_); auto *var = scope.FindVar(out_var_handles[i]->name_);
auto *dev_ctx = dev_ctxes_[p]; auto *dev_ctx = dev_ctxes_.at(p);
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] { RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>(); auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>();
......
...@@ -44,7 +44,8 @@ struct BroadcastOpHandle : public OpHandleBase { ...@@ -44,7 +44,8 @@ struct BroadcastOpHandle : public OpHandleBase {
nccl_ctxs_(nccl_ctxs) { nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) { if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) { for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get(); this->SetDeviceContext(platform::CUDAPlace(p_ctx.first),
p_ctx.second.ctx_.get());
} }
} }
} }
......
...@@ -37,7 +37,7 @@ void ComputationOpHandle::RunImpl() { ...@@ -37,7 +37,7 @@ void ComputationOpHandle::RunImpl() {
bool ComputationOpHandle::NeedWait(VarHandleBase *in_var) { bool ComputationOpHandle::NeedWait(VarHandleBase *in_var) {
bool need_wait = bool need_wait =
in_var && in_var->GeneratedOp() && in_var && in_var->GeneratedOp() &&
in_var->GeneratedOp()->DeviceContext(place_) != dev_ctxes_[place_]; in_var->GeneratedOp()->DeviceContext(place_) != dev_ctxes_.at(place_);
return need_wait; return need_wait;
} }
......
...@@ -28,7 +28,7 @@ DataBalanceOpHandle::DataBalanceOpHandle( ...@@ -28,7 +28,7 @@ DataBalanceOpHandle::DataBalanceOpHandle(
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) { : OpHandleBase(node), local_scopes_(local_scopes), places_(places) {
if (ctxs) { if (ctxs) {
for (auto &p : places_) { for (auto &p : places_) {
this->dev_ctxes_[p] = ctxs->DevCtx(p); this->SetDeviceContext(p, ctxs->DevCtx(p));
} }
} }
} }
...@@ -89,8 +89,8 @@ void DataBalanceOpHandle::RunImpl() { ...@@ -89,8 +89,8 @@ void DataBalanceOpHandle::RunImpl() {
PADDLE_ENFORCE_GT(places_.size(), 1, PADDLE_ENFORCE_GT(places_.size(), 1,
"Data balance can only be enabled when the number of " "Data balance can only be enabled when the number of "
"places to run larger than 1."); "places to run larger than 1.");
auto in_var_handles = DynamicCast<VarHandle>(inputs_); auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
auto out_var_handles = DynamicCast<VarHandle>(outputs_); auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
PADDLE_ENFORCE(in_var_handles.size() % places_.size() == 0); PADDLE_ENFORCE(in_var_handles.size() % places_.size() == 0);
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
in_var_handles.size(), out_var_handles.size(), in_var_handles.size(), out_var_handles.size(),
......
...@@ -92,13 +92,13 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( ...@@ -92,13 +92,13 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
size_t num_complete = 0; size_t num_complete = 0;
remaining_ = 0; remaining_ = 0;
BlockingQueue<size_t> complete_q; auto complete_q = std::make_shared<BlockingQueue<size_t>>();
for (auto op : bootstrap_ops_) { for (auto op : bootstrap_ops_) {
RunOpAsync(op_deps.get(), op, &complete_q); RunOpAsync(op_deps.get(), op, complete_q);
} }
while (num_complete != op_deps->size()) { while (num_complete != op_deps->size()) {
size_t num_comp = complete_q.Pop(); size_t num_comp = complete_q->Pop();
if (num_comp == -1UL) { if (num_comp == -1UL) {
int remaining = 0; int remaining = 0;
while (true) { while (true) {
...@@ -107,7 +107,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( ...@@ -107,7 +107,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
break; break;
} }
for (int i = 0; i < remaining; ++i) { for (int i = 0; i < remaining; ++i) {
complete_q.Pop(); complete_q->Pop();
} }
} }
exception_.ReThrow(); exception_.ReThrow();
...@@ -120,7 +120,8 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( ...@@ -120,7 +120,8 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
} }
void FastThreadedSSAGraphExecutor::RunOpAsync( void FastThreadedSSAGraphExecutor::RunOpAsync(
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps, std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q) { OpHandleBase *op,
const std::shared_ptr<BlockingQueue<size_t>> &complete_q) {
++remaining_; ++remaining_;
this->pool_.enqueue([=] { this->pool_.enqueue([=] {
OpHandleBase *op_to_run = op; OpHandleBase *op_to_run = op;
...@@ -144,7 +145,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync( ...@@ -144,7 +145,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
if (op_to_run == nullptr) { if (op_to_run == nullptr) {
op_to_run = pending_op; op_to_run = pending_op;
} else { } else {
this->RunOpAsync(op_deps, pending_op, complete_q); RunOpAsync(op_deps, pending_op, complete_q);
} }
} }
} }
...@@ -156,8 +157,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync( ...@@ -156,8 +157,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
} }
void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() { void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() {
atomic_op_deps_ = pool_.enqueue([&] { atomic_op_deps_ = pool_.enqueue([&] {
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps = auto *op_deps = new std::unordered_map<OpHandleBase *, std::atomic<int>>;
new std::unordered_map<OpHandleBase *, std::atomic<int>>;
for (auto &pair : op_deps_) { for (auto &pair : op_deps_) {
(*op_deps)[pair.first] = pair.second; (*op_deps)[pair.first] = pair.second;
} }
......
...@@ -50,7 +50,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -50,7 +50,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::atomic<int> remaining_; std::atomic<int> remaining_;
void RunOpAsync(std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps, void RunOpAsync(std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q); OpHandleBase *op,
const std::shared_ptr<BlockingQueue<size_t>> &complete_q);
void PrepareAtomicOpDeps(); void PrepareAtomicOpDeps();
......
...@@ -36,7 +36,7 @@ void GatherOpHandle::RunImpl() { ...@@ -36,7 +36,7 @@ void GatherOpHandle::RunImpl() {
VarHandle *out_var_handle; VarHandle *out_var_handle;
{ {
auto out_var_handles = DynamicCast<VarHandle>(outputs_); auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, PADDLE_ENFORCE_EQ(out_var_handles.size(), 1,
"The number of output should be one."); "The number of output should be one.");
out_var_handle = out_var_handles.front(); out_var_handle = out_var_handles.front();
...@@ -99,7 +99,7 @@ void GatherOpHandle::RunImpl() { ...@@ -99,7 +99,7 @@ void GatherOpHandle::RunImpl() {
Tensor *out_tensor = out_value->mutable_value(); Tensor *out_tensor = out_value->mutable_value();
// copy // copy
auto dev_ctx = dev_ctxes_[out_var_handle->place_]; auto dev_ctx = dev_ctxes_.at(out_var_handle->place_);
RunAndRecordEvent(out_var_handle->place_, [in_tensors, out_tensor, &dev_ctx, RunAndRecordEvent(out_var_handle->place_, [in_tensors, out_tensor, &dev_ctx,
t_out_p] { t_out_p] {
int s = 0, e = 0; int s = 0, e = 0;
......
...@@ -103,7 +103,7 @@ void OpHandleBase::WaitInputVarGenerated() { ...@@ -103,7 +103,7 @@ void OpHandleBase::WaitInputVarGenerated() {
void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) { void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) {
for (auto *in : inputs_) { for (auto *in : inputs_) {
if (NeedWait(in)) { if (NeedWait(in)) {
in->GeneratedOp()->RecordWaitEventOnCtx(dev_ctxes_[place]); in->GeneratedOp()->RecordWaitEventOnCtx(dev_ctxes_.at(place));
} }
} }
} }
......
...@@ -27,7 +27,7 @@ namespace framework { ...@@ -27,7 +27,7 @@ namespace framework {
namespace details { namespace details {
void ReduceOpHandle::RunImpl() { void ReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second); platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
if (places_.size() == 1) return; if (places_.size() == 1) return;
// the input and output may have dummy var. // the input and output may have dummy var.
......
...@@ -46,7 +46,8 @@ struct ReduceOpHandle : public OpHandleBase { ...@@ -46,7 +46,8 @@ struct ReduceOpHandle : public OpHandleBase {
nccl_ctxs_(nccl_ctxs) { nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) { if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) { for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get(); this->SetDeviceContext(platform::CUDAPlace(p_ctx.first),
p_ctx.second.ctx_.get());
} }
} }
} }
......
...@@ -38,7 +38,7 @@ void RPCOpHandle::RunImpl() { ...@@ -38,7 +38,7 @@ void RPCOpHandle::RunImpl() {
continue; continue;
} }
if (in->GeneratedOp()) { if (in->GeneratedOp()) {
in->GeneratedOp()->RecordWaitEventOnCtx(dev_ctxes_[p]); in->GeneratedOp()->RecordWaitEventOnCtx(dev_ctxes_.at(p));
} }
} }
auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(); auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
......
...@@ -27,7 +27,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(ir::Node *node, size_t num_dev, ...@@ -27,7 +27,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(ir::Node *node, size_t num_dev,
coeff_(static_cast<float>(1.0 / num_dev)), coeff_(static_cast<float>(1.0 / num_dev)),
scope_(scope), scope_(scope),
place_(place) { place_(place) {
dev_ctxes_[place_] = dev_ctx; this->SetDeviceContext(place_, dev_ctx);
} }
ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {} ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {}
...@@ -46,9 +46,9 @@ void ScaleLossGradOpHandle::RunImpl() { ...@@ -46,9 +46,9 @@ void ScaleLossGradOpHandle::RunImpl() {
} else { } else {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
this->RunAndRecordEvent([&] { this->RunAndRecordEvent([&] {
auto stream = auto stream = static_cast<platform::CUDADeviceContext *>(
static_cast<platform::CUDADeviceContext *>(this->dev_ctxes_[place_]) this->dev_ctxes_.at(place_))
->stream(); ->stream();
memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp, memory::Copy(boost::get<platform::CUDAPlace>(place_), tmp,
platform::CPUPlace(), &coeff_, sizeof(float), stream); platform::CPUPlace(), &coeff_, sizeof(float), stream);
VLOG(10) << place_ << "RUN Scale loss grad op"; VLOG(10) << place_ << "RUN Scale loss grad op";
......
...@@ -39,7 +39,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -39,7 +39,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr)); new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr));
std::unordered_map<OpHandleBase *, size_t> pending_ops; std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars; std::unordered_set<VarHandleBase *> pending_vars;
BlockingQueue<VarHandleBase *> ready_vars; auto ready_vars = std::make_shared<BlockingQueue<VarHandleBase *>>();
std::unordered_set<OpHandleBase *> ready_ops; std::unordered_set<OpHandleBase *> ready_ops;
// For ops (e.g. nccl_all_reduce) that need to coordinate multiple // For ops (e.g. nccl_all_reduce) that need to coordinate multiple
// streams from multiple GPUs, it's faster to buffer them and schedule // streams from multiple GPUs, it's faster to buffer them and schedule
...@@ -51,12 +51,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -51,12 +51,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
for (auto &var_map : graph_->Get<details::GraphVars>(details::kGraphVars)) { for (auto &var_map : graph_->Get<details::GraphVars>(details::kGraphVars)) {
for (auto &name_pair : var_map) { for (auto &name_pair : var_map) {
for (auto &version_pair : name_pair.second) { for (auto &version_pair : name_pair.second) {
InsertPendingVar(&pending_vars, &ready_vars, version_pair.get()); InsertPendingVar(&pending_vars, ready_vars.get(), version_pair.get());
} }
} }
} }
for (auto &var : graph_->Get<details::GraphDepVars>(details::kGraphDepVars)) { for (auto &var : graph_->Get<details::GraphDepVars>(details::kGraphDepVars)) {
InsertPendingVar(&pending_vars, &ready_vars, var.get()); InsertPendingVar(&pending_vars, ready_vars.get(), var.get());
} }
for (auto &op : graph_->Get<details::GraphOps>(details::kGraphOps)) { for (auto &op : graph_->Get<details::GraphOps>(details::kGraphOps)) {
...@@ -73,12 +73,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -73,12 +73,12 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
FeedFetchList fetch_data(fetch_tensors.size()); FeedFetchList fetch_data(fetch_tensors.size());
InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops, InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops,
&pending_vars, &ready_vars, &fetch_data); &pending_vars, ready_vars.get(), &fetch_data);
auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) { auto run_all_ops = [&](std::unordered_set<OpHandleBase *> &set) {
for (auto *op : set) { for (auto *op : set) {
running_ops_++; running_ops_++;
RunOp(&ready_vars, op); RunOp(ready_vars, op);
} }
set.clear(); set.clear();
}; };
...@@ -87,7 +87,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -87,7 +87,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
run_op_futures_.clear(); run_op_futures_.clear();
exception_holder_.Clear(); exception_holder_.Clear();
event.reset(nullptr); event.reset(nullptr);
// Step 3. Execution // Step 3. Execution
while (!pending_vars.empty()) { while (!pending_vars.empty()) {
// 1. Run All Ready ops // 1. Run All Ready ops
...@@ -103,7 +102,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -103,7 +102,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// 2. Find ready variable // 2. Find ready variable
bool timeout; bool timeout;
auto cur_ready_vars = ready_vars.PopAll(1, &timeout); auto cur_ready_vars = ready_vars->PopAll(1, &timeout);
if (timeout) { if (timeout) {
if (exception_holder_.IsCaught()) { if (exception_holder_.IsCaught()) {
...@@ -133,7 +132,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -133,7 +132,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
} }
} }
PADDLE_ENFORCE(ready_ops.empty()); PADDLE_ENFORCE(ready_ops.empty());
// Wait FetchOps. // Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops); ClearFetchOp(graph_.get(), &fetch_ops);
...@@ -206,7 +204,8 @@ void ThreadedSSAGraphExecutor::InsertPendingVar( ...@@ -206,7 +204,8 @@ void ThreadedSSAGraphExecutor::InsertPendingVar(
} }
void ThreadedSSAGraphExecutor::RunOp( void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) { const std::shared_ptr<BlockingQueue<VarHandleBase *>> &ready_var_q,
details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] { auto op_run = [ready_var_q, op, this] {
try { try {
if (VLOG_IS_ON(10)) { if (VLOG_IS_ON(10)) {
......
...@@ -51,7 +51,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -51,7 +51,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
~ThreadedSSAGraphExecutor() {} ~ThreadedSSAGraphExecutor() {}
private: private:
void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q, void RunOp(const std::shared_ptr<BlockingQueue<VarHandleBase *>> &ready_var_q,
details::OpHandleBase *op); details::OpHandleBase *op);
private: private:
......
...@@ -19,81 +19,7 @@ limitations under the License. */ ...@@ -19,81 +19,7 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace framework { namespace framework {
// NOTE The vector<LoDTensor> can't be replaced with the class LoDTensorArray
// directly, because there are many vector<LoDTensor> used accross the project,
// and some of them are treated as LoDTensorArray.
#if !defined(PADDLE_ON_INFERENCE)
using LoDTensorArray = std::vector<LoDTensor>; using LoDTensorArray = std::vector<LoDTensor>;
#else // !PADDLE_ON_INFERENCE
#pragma message "LoDTensorArray is replaced with the inference one."
/*
* A LoDTensorArray which will not deallocate buffer when resized, fix the data
* diff in inference, and more performance friendly in the concurrency
* scenerios.
*/
class LoDTensorArray {
public:
LoDTensorArray() = default;
using iterator = std::vector<LoDTensor>::iterator;
using const_iterator = std::vector<LoDTensor>::const_iterator;
const_iterator begin() const { return array_.begin(); }
const_iterator end() const { return array_.begin() + size_; }
iterator begin() { return array_.begin(); }
iterator end() { return array_.begin() + size_; }
void push_back(const LoDTensor& x) {
if (size_ < array_.size()) {
array_[size_++] = x;
} else {
array_.push_back(x);
++size_;
}
}
void resize(size_t size) {
if (array_.size() < size) {
array_.resize(size);
}
size_ = size;
}
void emplace_back() { array_.emplace_back(); }
void emplace_back(LoDTensor&& x) { array_.emplace_back(std::move(x)); }
LoDTensor& back() { return array_.back(); }
size_t space() const { return array_.size(); }
void reserve(size_t size) {
// Naive warning to tell user this array might be to large. The memory and
// buffer used by this TensorArray will not be deleted during the training
// and inference phase, so attention not to make it expand too long.
if (size > 800UL) {
LOG(WARNING) << "TensorArray has more than 800 items";
}
array_.reserve(size);
}
bool empty() const { return size_ == 0UL; }
void clear() { size_ = 0UL; }
LoDTensor& operator[](size_t id) { return array_[id]; }
const LoDTensor& operator[](size_t id) const { return array_[id]; }
LoDTensor& at(size_t id) { return array_.at(id); }
const LoDTensor& at(size_t id) const { return array_.at(id); }
size_t size() const { return size_; }
private:
size_t size_{0};
std::vector<LoDTensor> array_;
};
#endif // !PADDLE_ON_INFERENCE
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -70,8 +70,12 @@ void Main(bool use_gpu) { ...@@ -70,8 +70,12 @@ void Main(bool use_gpu) {
// The outputs' buffers are in CPU memory. // The outputs' buffers are in CPU memory.
for (size_t i = 0; i < std::min(static_cast<size_t>(5), num_elements); for (size_t i = 0; i < std::min(static_cast<size_t>(5), num_elements);
i++) { i++) {
CHECK_NEAR(static_cast<float*>(outputs.front().data.data())[i], result[i], // Here will result random fail, for that the model is trained by CI, the
0.001); // train phase is not stable, so the result will be random.
// TODO(Superjomn) will restore after the model is upload.
// CHECK_NEAR(static_cast<float*>(outputs.front().data.data())[i],
// result[i],
// 0.001);
} }
} }
} }
......
...@@ -102,7 +102,9 @@ REGISTER_OPERATOR(gather, ops::GatherOp, ops::GatherOpMaker, ...@@ -102,7 +102,9 @@ REGISTER_OPERATOR(gather, ops::GatherOp, ops::GatherOpMaker,
paddle::framework::DefaultGradOpDescMaker<true>); paddle::framework::DefaultGradOpDescMaker<true>);
REGISTER_OPERATOR(gather_grad, ops::GatherGradOp); REGISTER_OPERATOR(gather_grad, ops::GatherGradOp);
REGISTER_OP_CPU_KERNEL(gather, ops::GatherOpKernel<float>, REGISTER_OP_CPU_KERNEL(gather, ops::GatherOpKernel<float>,
ops::GatherOpKernel<int>, ops::GatherOpKernel<double>); ops::GatherOpKernel<double>, ops::GatherOpKernel<int>,
ops::GatherOpKernel<int64_t>);
REGISTER_OP_CPU_KERNEL(gather_grad, ops::GatherGradientOpKernel<float>, REGISTER_OP_CPU_KERNEL(gather_grad, ops::GatherGradientOpKernel<float>,
ops::GatherGradientOpKernel<double>,
ops::GatherGradientOpKernel<int>, ops::GatherGradientOpKernel<int>,
ops::GatherGradientOpKernel<double>); ops::GatherGradientOpKernel<int64_t>);
...@@ -61,5 +61,11 @@ class GatherGradOpCUDAKernel : public framework::OpKernel<T> { ...@@ -61,5 +61,11 @@ class GatherGradOpCUDAKernel : public framework::OpKernel<T> {
} // namespace paddle } // namespace paddle
namespace ops = paddle::operators; namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(gather, ops::GatherOpCUDAKernel<float>); REGISTER_OP_CUDA_KERNEL(gather, ops::GatherOpCUDAKernel<float>,
REGISTER_OP_CUDA_KERNEL(gather_grad, ops::GatherGradOpCUDAKernel<float>); ops::GatherOpCUDAKernel<double>,
ops::GatherOpCUDAKernel<int64_t>,
ops::GatherOpCUDAKernel<int>);
REGISTER_OP_CUDA_KERNEL(gather_grad, ops::GatherGradOpCUDAKernel<float>,
ops::GatherGradOpCUDAKernel<double>,
ops::GatherGradOpCUDAKernel<int64_t>,
ops::GatherGradOpCUDAKernel<int>);
...@@ -749,7 +749,7 @@ def dynamic_gru(input, ...@@ -749,7 +749,7 @@ def dynamic_gru(input,
attr=helper.bias_attr, shape=[1, 3 * size], dtype=dtype, is_bias=True) attr=helper.bias_attr, shape=[1, 3 * size], dtype=dtype, is_bias=True)
batch_size = input.shape[0] batch_size = input.shape[0]
inputs = {'Input': input, 'Weight': weight, 'Bias': bias} inputs = {'Input': input, 'Weight': weight, 'Bias': bias}
if h_0 != None: if h_0:
assert h_0.shape == ( assert h_0.shape == (
batch_size, size batch_size, size
), 'The shape of h0 should be(batch_size, %d)' % size ), 'The shape of h0 should be(batch_size, %d)' % size
...@@ -3020,7 +3020,8 @@ def sequence_pad(x, pad_value, maxlen=None, name=None): ...@@ -3020,7 +3020,8 @@ def sequence_pad(x, pad_value, maxlen=None, name=None):
x = fluid.layers.data(name='y', shape=[10, 5], x = fluid.layers.data(name='y', shape=[10, 5],
dtype='float32', lod_level=1) dtype='float32', lod_level=1)
pad_value = fluid.layers.assign(input=numpy.array([0])) pad_value = fluid.layers.assign(
input=numpy.array([0], dtype=numpy.float32))
out = fluid.layers.sequence_pad(x=x, pad_value=pad_value) out = fluid.layers.sequence_pad(x=x, pad_value=pad_value)
""" """
......
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
# default test if(NOT APPLE)
foreach(src ${TEST_OPS}) # default test
py_test(${src} SRCS ${src}.py) foreach(src ${TEST_OPS})
endforeach() py_test(${src} SRCS ${src}.py)
endforeach()
else()
foreach(src ${TEST_OPS})
if(${src} STREQUAL "test_image_classification_vgg")
message(WARNING "These tests has been disabled in OSX for random fail: \n" ${src})
elseif(${src} STREQUAL "test_image_classification_resnet")
message(WARNING "These tests has been disabled in OSX for random fail: \n" ${src})
elseif()
py_test(${src} SRCS ${src}.py)
endif()
endforeach()
endif()
...@@ -17,6 +17,10 @@ if(NOT WITH_DISTRIBUTE) ...@@ -17,6 +17,10 @@ if(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op)
LIST(REMOVE_ITEM TEST_OPS test_dist_mnist) LIST(REMOVE_ITEM TEST_OPS test_dist_mnist)
LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec) LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec)
LIST(REMOVE_ITEM TEST_OPS test_dist_ctr)
LIST(REMOVE_ITEM TEST_OPS test_dist_simnet_bow)
LIST(REMOVE_ITEM TEST_OPS test_dist_mnist_batch_merge)
LIST(REMOVE_ITEM TEST_OPS test_dist_text_classification)
endif(NOT WITH_DISTRIBUTE) endif(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290 list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
...@@ -89,4 +93,6 @@ py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SE ...@@ -89,4 +93,6 @@ py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SE
py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL) py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL)
set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 150) set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 150)
py_test_modules(test_parallel_executor_transformer MODULES test_parallel_executor_transformer SERIAL) py_test_modules(test_parallel_executor_transformer MODULES test_parallel_executor_transformer SERIAL)
py_test_modules(test_image_classification_resnet MODULES test_image_classification_resnet SERIAL) if(NOT APPLE)
py_test_modules(test_image_classification_resnet MODULES test_image_classification_resnet SERIAL)
endif()
...@@ -90,8 +90,10 @@ class TestDistMnist2x2(TestDistRunnerBase): ...@@ -90,8 +90,10 @@ class TestDistMnist2x2(TestDistRunnerBase):
inference_program = fluid.default_main_program().clone() inference_program = fluid.default_main_program().clone()
# Optimization # Optimization
opt = fluid.optimizer.AdamOptimizer( # TODO(typhoonzero): fix distributed adam optimizer
learning_rate=0.001, beta1=0.9, beta2=0.999) # opt = fluid.optimizer.AdamOptimizer(
# learning_rate=0.001, beta1=0.9, beta2=0.999)
opt = fluid.optimizer.Momentum(learning_rate=0.001, momentum=0.9)
# Reader # Reader
train_reader = paddle.batch( train_reader = paddle.batch(
......
...@@ -22,6 +22,8 @@ import signal ...@@ -22,6 +22,8 @@ import signal
import subprocess import subprocess
import six import six
import argparse import argparse
import pickle
import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -128,10 +130,15 @@ class TestDistRunnerBase(object): ...@@ -128,10 +130,15 @@ class TestDistRunnerBase(object):
else: else:
return origin_batch return origin_batch
out_losses = []
for _ in six.moves.xrange(RUN_STEP): for _ in six.moves.xrange(RUN_STEP):
loss, = exe.run(fetch_list=[avg_cost.name], loss, = exe.run(fetch_list=[avg_cost.name],
feed=feeder.feed(get_data())) feed=feeder.feed(get_data()))
print(loss) out_losses.append(loss[0])
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
def runtime_main(test_class): def runtime_main(test_class):
...@@ -149,7 +156,7 @@ def runtime_main(test_class): ...@@ -149,7 +156,7 @@ def runtime_main(test_class):
parser.add_argument('--use_cuda', action='store_true') parser.add_argument('--use_cuda', action='store_true')
parser.add_argument('--use_reduce', action='store_true') parser.add_argument('--use_reduce', action='store_true')
parser.add_argument( parser.add_argument(
'--use_reader_alloc', action='store_true', required=False, default=True) '--use_reader_alloc', action='store_true', required=False)
parser.add_argument('--batch_size', required=False, type=int, default=2) parser.add_argument('--batch_size', required=False, type=int, default=2)
parser.add_argument( parser.add_argument(
'--batch_merge_repeat', required=False, type=int, default=1) '--batch_merge_repeat', required=False, type=int, default=1)
...@@ -188,7 +195,7 @@ class TestDistBase(unittest.TestCase): ...@@ -188,7 +195,7 @@ class TestDistBase(unittest.TestCase):
self._pservers = 2 self._pservers = 2
self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % ( self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
self._find_free_port(), self._find_free_port()) self._find_free_port(), self._find_free_port())
self._python_interp = "python" self._python_interp = sys.executable
self._sync_mode = True self._sync_mode = True
self._enforce_place = None self._enforce_place = None
self._mem_opt = False self._mem_opt = False
...@@ -237,21 +244,6 @@ class TestDistBase(unittest.TestCase): ...@@ -237,21 +244,6 @@ class TestDistBase(unittest.TestCase):
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
def _wait_ps_ready(self, pid):
retry_times = 50
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(3)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error as e:
sys.stderr.write('waiting for pserver: %s, left retry %d\n' %
(e, retry_times))
retry_times -= 1
def _run_local(self, def _run_local(self,
model, model,
envs, envs,
...@@ -288,23 +280,20 @@ class TestDistBase(unittest.TestCase): ...@@ -288,23 +280,20 @@ class TestDistBase(unittest.TestCase):
env=envs) env=envs)
local_out, local_err = local_proc.communicate() local_out, local_err = local_proc.communicate()
local_ret = cpt.to_text(local_out)
if check_error_log: if check_error_log:
err_log.close() err_log.close()
sys.stderr.write('local_stdout: %s\n' % local_ret) sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
sys.stderr.write('local_stderr: %s\n' % local_err) sys.stderr.write('local_stderr: %s\n' % local_err)
local_losses = local_ret.split("\n") return pickle.loads(local_out)
return local_losses
def _run_cluster(self, model, envs, check_error_log): def _run_cluster(self, model, envs, check_error_log):
# Run dist train to compare with local results # Run dist train to compare with local results
ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model, ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
check_error_log, envs) check_error_log, envs)
self._wait_ps_ready(ps0.pid)
self._wait_ps_ready(ps1.pid)
ps0_ep, ps1_ep = self._ps_endpoints.split(",") ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist" tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist"
...@@ -339,8 +328,8 @@ class TestDistBase(unittest.TestCase): ...@@ -339,8 +328,8 @@ class TestDistBase(unittest.TestCase):
env0.update(envs) env0.update(envs)
env1.update(envs) env1.update(envs)
print("tr0_cmd:{}, env0: {}".format(tr0_cmd, env0)) print("tr0_cmd:{}".format(tr0_cmd))
print("tr1_cmd:{}, env1: {}".format(tr1_cmd, env1)) print("tr1_cmd:{}".format(tr1_cmd))
tr0_pipe = open("/tmp/tr0_err.log", "wb") tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb")
...@@ -356,9 +345,7 @@ class TestDistBase(unittest.TestCase): ...@@ -356,9 +345,7 @@ class TestDistBase(unittest.TestCase):
env=env1) env=env1)
tr0_out, tr0_err = tr0_proc.communicate() tr0_out, tr0_err = tr0_proc.communicate()
tr0_loss_text = cpt.to_text(tr0_out)
tr1_out, tr1_err = tr1_proc.communicate() tr1_out, tr1_err = tr1_proc.communicate()
tr1_loss_text = cpt.to_text(tr1_out)
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
...@@ -373,15 +360,13 @@ class TestDistBase(unittest.TestCase): ...@@ -373,15 +360,13 @@ class TestDistBase(unittest.TestCase):
ps1.terminate() ps1.terminate()
# print log # print log
sys.stderr.write('trainer 0 stdout:\n %s\n' % tr0_loss_text) sys.stderr.write('trainer 0 stdout: %s\n' % pickle.loads(tr0_out))
sys.stderr.write('trainer 0 stderr:\n %s\n' % tr0_err) sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stdout: %s\n' % tr1_loss_text) sys.stderr.write('trainer 1 stdout: %s\n' % pickle.loads(tr1_out))
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
tr0_losses = tr0_loss_text.split("\n") # return tr0_losses, tr1_losses
tr1_losses = tr1_loss_text.split("\n") return pickle.loads(tr0_out), pickle.loads(tr1_out)
return tr0_losses, tr1_losses
def check_with_place(self, def check_with_place(self,
model_file, model_file,
...@@ -411,9 +396,9 @@ class TestDistBase(unittest.TestCase): ...@@ -411,9 +396,9 @@ class TestDistBase(unittest.TestCase):
check_error_log) check_error_log)
for step_id in range(RUN_STEP): for step_id in range(RUN_STEP):
local_loss = eval(local_losses[step_id])[0] local_loss = local_losses[step_id]
tr0_loss = eval(tr0_losses[step_id])[0] tr0_loss = tr0_losses[step_id]
tr1_loss = eval(tr1_losses[step_id])[0] tr1_loss = tr1_losses[step_id]
dist_loss = (tr0_loss + tr1_loss) / 2 dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
print(str(local_loss) + ":" + str(dist_loss)) print("=======", local_loss, ":", dist_loss[0], "=======")
self.assertAlmostEqual(local_loss, dist_loss, delta=delta) self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)
...@@ -23,16 +23,17 @@ class TestDistSeResneXt2x2(TestDistBase): ...@@ -23,16 +23,17 @@ class TestDistSeResneXt2x2(TestDistBase):
self._use_reader_alloc = False self._use_reader_alloc = False
def test_dist_train(self): def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=100) self.check_with_place("dist_se_resnext.py", delta=1e-7)
class TestDistseResnXt2x2WithMemopt(TestDistBase): class TestDistseResnXt2x2WithMemopt(TestDistBase):
def _setup_config(self): def _setup_config(self):
self._sync_mode = True self._sync_mode = True
self._mem_opt = True self._mem_opt = True
self._use_reader_alloc = False
def test_dist_train(self): def test_dist_train(self):
self.check_with_place("dist_se_resnext.py", delta=100) self.check_with_place("dist_se_resnext.py", delta=1e-7)
class TestDistSeResneXt2x2Async(TestDistBase): class TestDistSeResneXt2x2Async(TestDistBase):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册