提交 f71543ee 编写于 作者: S sandyhouse

Merge branch 'add_timeline' into pipeline_exe_run

...@@ -455,6 +455,7 @@ class SectionWorker : public DeviceWorker { ...@@ -455,6 +455,7 @@ class SectionWorker : public DeviceWorker {
std::vector<std::unique_ptr<OperatorBase>> ops_; std::vector<std::unique_ptr<OperatorBase>> ops_;
static std::mutex thread_mutex; static std::mutex thread_mutex;
static std::mutex cout_mutex;
static std::condition_variable thread_condition; static std::condition_variable thread_condition;
static bool threads_completed; static bool threads_completed;
std::shared_ptr<framework::ProgramDesc> program_; std::shared_ptr<framework::ProgramDesc> program_;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL)
#include <map>
#include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/trainer.h" #include "paddle/fluid/framework/trainer.h"
...@@ -44,7 +45,6 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, ...@@ -44,7 +45,6 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
"must be 1 now, but the value you give is %d.", "must be 1 now, but the value you give is %d.",
num_readers)); num_readers));
auto* reader = readers[0]; auto* reader = readers[0];
feed_var_names_ = reader->GetUseSlotAlias();
workers_.resize(section_num_); workers_.resize(section_num_);
for (int i = 0; i < section_num_; ++i) { for (int i = 0; i < section_num_; ++i) {
...@@ -123,26 +123,36 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, ...@@ -123,26 +123,36 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id,
const ProgramDesc& program, const ProgramDesc& program,
const platform::Place& place) { const platform::Place& place) {
auto& global_block = program.Block(0); auto& global_block = program.Block(0);
std::map<std::string, int> param_map;
for (auto& var : global_block.AllVars()) { for (auto& var : global_block.AllVars()) {
int is_feed_var = if (var->Persistable()) {
std::count(feed_var_names_.begin(), feed_var_names_.end(), var->Name()); param_map[var->Name()] = 1;
if ((var->Persistable() || is_feed_var) && microbatch_id == 0) { }
if (is_feed_var) { }
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); for (auto& var : global_block.AllVars()) {
VLOG(3) << "data name: " << var->Name() << ", ptr: " << new_ptr; bool is_param_grad = false;
InitializeVariable(new_ptr, var->GetType()); size_t pos = 0;
} else { if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) {
auto prefix_name = var->Name().substr(0, pos);
if (param_map.find(prefix_name) != param_map.end()) {
is_param_grad = true;
}
}
VLOG(3) << "Var name: " << var->Name();
if ((var->Persistable() || is_param_grad) && microbatch_id == 0) {
auto* ptr = root_scope_->FindVar(var->Name()); auto* ptr = root_scope_->FindVar(var->Name());
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name());
VLOG(3) << "Create persistable var " << var->Name() << " for minibatch " VLOG(3) << "Create persistable var " << var->Name() << " for minibatch "
<< section_id << ", which pointer is " << new_ptr; << section_id << ", which pointer is " << new_ptr;
InitializeVariable(new_ptr, var->GetType()); InitializeVariable(new_ptr, var->GetType());
if (is_param_grad) {
continue;
}
const LoDTensor& root_tensor = ptr->Get<LoDTensor>(); const LoDTensor& root_tensor = ptr->Get<LoDTensor>();
LoDTensor* minibatch_tensor = new_ptr->GetMutable<LoDTensor>(); LoDTensor* minibatch_tensor = new_ptr->GetMutable<LoDTensor>();
TensorCopy(*static_cast<const Tensor*>(&root_tensor), place, TensorCopy(*static_cast<const Tensor*>(&root_tensor), place,
static_cast<Tensor*>(minibatch_tensor)); static_cast<Tensor*>(minibatch_tensor));
} } else if (!var->Persistable() && !is_param_grad) {
} else if (!var->Persistable() && !is_feed_var) {
auto* ptr = auto* ptr =
microbatch_scopes_[section_id][microbatch_id]->Var(var->Name()); microbatch_scopes_[section_id][microbatch_id]->Var(var->Name());
VLOG(3) << "Create variable " << var->Name() << " for section " VLOG(3) << "Create variable " << var->Name() << " for section "
...@@ -244,7 +254,7 @@ void PipelineTrainer::Finalize() { ...@@ -244,7 +254,7 @@ void PipelineTrainer::Finalize() {
const LoDTensor& minibatch_tensor = minibatch_ptr->Get<LoDTensor>(); const LoDTensor& minibatch_tensor = minibatch_ptr->Get<LoDTensor>();
TensorCopy(*static_cast<const Tensor*>(&minibatch_tensor), places_[0], TensorCopy(*static_cast<const Tensor*>(&minibatch_tensor), places_[0],
static_cast<Tensor*>(root_tensor)); static_cast<Tensor*>(root_tensor));
VLOG(4) << "Copy persitable var " << var->Name() << " to root scope"; VLOG(3) << "Copy persitable var " << var->Name() << " to root scope";
} }
} }
} }
......
...@@ -32,6 +32,7 @@ namespace framework { ...@@ -32,6 +32,7 @@ namespace framework {
std::atomic<int> SectionWorker::cpu_id_(0); std::atomic<int> SectionWorker::cpu_id_(0);
std::mutex SectionWorker::thread_mutex; std::mutex SectionWorker::thread_mutex;
std::mutex SectionWorker::cout_mutex;
std::condition_variable SectionWorker::thread_condition; std::condition_variable SectionWorker::thread_condition;
bool SectionWorker::threads_completed = false; bool SectionWorker::threads_completed = false;
uint64_t SectionWorker::batch_id_(0); uint64_t SectionWorker::batch_id_(0);
...@@ -103,9 +104,14 @@ void SectionWorker::TrainFiles() { ...@@ -103,9 +104,14 @@ void SectionWorker::TrainFiles() {
} }
#endif #endif
platform::Timer batch_timer;
if (thread_id_ == 0) { if (thread_id_ == 0) {
while (true) { while (true) {
// Start a minibatch. // Start a minibatch.
// real number of microbatches run
int real_microbatch_num = 0;
batch_timer.Start();
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < num_microbatches_; ++i) {
try { try {
for (auto& op : ops_) { for (auto& op : ops_) {
...@@ -137,17 +143,21 @@ void SectionWorker::TrainFiles() { ...@@ -137,17 +143,21 @@ void SectionWorker::TrainFiles() {
VLOG(3) << "called notify all"; VLOG(3) << "called notify all";
thread_condition.notify_all(); thread_condition.notify_all();
VLOG(0) << "EOF encountered"; VLOG(0) << "EOF encountered";
return; break;
} }
if (i == 0) { {
real_microbatch_num += 1;
batch_id_ += 1;
VLOG(3) << "called notify all"; VLOG(3) << "called notify all";
std::unique_lock<std::mutex> lk(thread_mutex); std::unique_lock<std::mutex> lk(thread_mutex);
batch_id_ += 1;
thread_condition.notify_all(); thread_condition.notify_all();
} }
} }
dev_ctx_->Wait();
VLOG(0) << "real_microbatch_num for thread 0 " << real_microbatch_num;
// backward pass // backward pass
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < real_microbatch_num; ++i) {
for (auto& op : ops_) { for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) || if (op_role == static_cast<int>(OpRole::kBackward) ||
...@@ -163,6 +173,12 @@ void SectionWorker::TrainFiles() { ...@@ -163,6 +173,12 @@ void SectionWorker::TrainFiles() {
} }
} }
} }
dev_ctx_->Wait();
if (real_microbatch_num == 0) {
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
return;
}
// update pass // update pass
for (auto& op : ops_) { for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
...@@ -177,9 +193,21 @@ void SectionWorker::TrainFiles() { ...@@ -177,9 +193,21 @@ void SectionWorker::TrainFiles() {
} }
} }
dev_ctx_->Wait(); dev_ctx_->Wait();
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
{
std::unique_lock<std::mutex> lk(thread_mutex);
if (threads_completed) {
return;
}
}
} }
} else { } else {
while (true) { while (true) {
// forward pass:
bool local_completed = false;
int real_microbatch_num = 0;
for (int i = 0; i < num_microbatches_; ++i) {
{ {
PADDLE_ENFORCE_LE( PADDLE_ENFORCE_LE(
local_batch_id_, batch_id_, local_batch_id_, batch_id_,
...@@ -197,13 +225,13 @@ void SectionWorker::TrainFiles() { ...@@ -197,13 +225,13 @@ void SectionWorker::TrainFiles() {
VLOG(3) << "thread " << thread_id_ << " completed."; VLOG(3) << "thread " << thread_id_ << " completed.";
lk.unlock(); lk.unlock();
threads_completed = false; threads_completed = false;
return; local_completed = true;
break;
} }
lk.unlock(); lk.unlock();
local_batch_id_ += 1; local_batch_id_ += 1;
real_microbatch_num += 1;
} }
// forward pass:
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) { for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch // We run op with op_role = kLRSched only for the first microbatch
...@@ -227,8 +255,9 @@ void SectionWorker::TrainFiles() { ...@@ -227,8 +255,9 @@ void SectionWorker::TrainFiles() {
} }
} }
} }
dev_ctx_->Wait();
// backward pass // backward pass
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < real_microbatch_num; ++i) {
for (auto& op : ops_) { for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) || if (op_role == static_cast<int>(OpRole::kBackward) ||
...@@ -244,7 +273,11 @@ void SectionWorker::TrainFiles() { ...@@ -244,7 +273,11 @@ void SectionWorker::TrainFiles() {
} }
} }
} }
dev_ctx_->Wait();
// update pass // update pass
if (real_microbatch_num == 0) {
return;
}
for (auto& op : ops_) { for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) { if (op_role == static_cast<int>(OpRole::kOptimize)) {
...@@ -258,6 +291,9 @@ void SectionWorker::TrainFiles() { ...@@ -258,6 +291,9 @@ void SectionWorker::TrainFiles() {
} }
} }
dev_ctx_->Wait(); dev_ctx_->Wait();
if (local_completed) {
return;
}
} }
} }
} }
...@@ -307,14 +343,20 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -307,14 +343,20 @@ void SectionWorker::TrainFilesWithProfiler() {
#endif #endif
if (thread_id_ == 0) { if (thread_id_ == 0) {
struct timeval start;
struct timeval end;
struct timeval micro_start;
struct timeval micro_end;
while (true) { while (true) {
// Start a minibatch. // Start a minibatch.
// int batch_size = 0;
batch_timer.Start(); batch_timer.Start();
int real_microbatch_num = 0;
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < num_microbatches_; ++i) {
try { try {
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch // We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times. // to avoid increasing the @LR_DECAY_STEP@ multiple times.
...@@ -335,7 +377,9 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -335,7 +377,9 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get()); unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
timeline.Pause(); timeline.Pause();
gettimeofday(&end, NULL);
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) { if (time > op_max_time[op_idx]) {
...@@ -346,9 +390,30 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -346,9 +390,30 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::FWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:SCOPE[" << i << "]:OP[" << op->Type()
<< "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]"
<< std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec
<< "]" << std::endl;
}
} catch (platform::EOFException&) { } catch (platform::EOFException&) {
std::unique_lock<std::mutex> lk(thread_mutex); std::unique_lock<std::mutex> lk(thread_mutex);
threads_completed = true; threads_completed = true;
...@@ -363,19 +428,23 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -363,19 +428,23 @@ void SectionWorker::TrainFilesWithProfiler() {
<< ", mean_time: " << op_total_time[i] / op_count[i]; << ", mean_time: " << op_total_time[i] / op_count[i];
} }
VLOG(0) << "================================"; VLOG(0) << "================================";
return; break;
} }
if (i == 0) { {
VLOG(3) << "called notify all"; VLOG(3) << "called notify all";
std::unique_lock<std::mutex> lk(thread_mutex); std::unique_lock<std::mutex> lk(thread_mutex);
real_microbatch_num += 1;
batch_id_ += 1; batch_id_ += 1;
thread_condition.notify_all(); thread_condition.notify_all();
} }
} }
dev_ctx_->Wait();
// backward pass // backward pass
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < real_microbatch_num; ++i) {
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) || if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) | op_role == (static_cast<int>(OpRole::kBackward) |
...@@ -388,6 +457,8 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -388,6 +457,8 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get()); unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
gettimeofday(&end, NULL);
timeline.Pause(); timeline.Pause();
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
...@@ -399,13 +470,42 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -399,13 +470,42 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::BWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:SCOPE[" << i << "]:OP[" << op->Type()
<< "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]"
<< std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec
<< "]" << std::endl;
}
}
dev_ctx_->Wait();
if (real_microbatch_num == 0) {
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
return;
} }
// update pass // update pass
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) { if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
...@@ -416,6 +516,8 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -416,6 +516,8 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get()); op.get(), unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
gettimeofday(&end, NULL);
timeline.Pause(); timeline.Pause();
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
...@@ -427,15 +529,53 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -427,15 +529,53 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:SCOPE[" << num_microbatches_ << "]:OP["
<< op->Type() << "]:START["
<< start.tv_sec * 1e6 + start.tv_usec << "]:END["
<< end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec << "]:END["
<< micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]"
<< std::endl;
}
dev_ctx_->Wait(); dev_ctx_->Wait();
batch_timer.Pause(); batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
{
std::unique_lock<std::mutex> lk(thread_mutex);
if (threads_completed) {
return;
}
}
} }
} else { } else {
struct timeval start;
struct timeval end;
struct timeval micro_start;
struct timeval micro_end;
cudaEvent_t cu_start, cu_stop;
cudaEventCreate(&cu_start);
cudaEventCreate(&cu_stop);
bool local_completed = false;
while (true) { while (true) {
// forward pass:
int real_microbatch_num = 0;
for (int i = 0; i < num_microbatches_; ++i) {
{ {
PADDLE_ENFORCE_LE( PADDLE_ENFORCE_LE(
local_batch_id_, batch_id_, local_batch_id_, batch_id_,
...@@ -450,25 +590,27 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -450,25 +590,27 @@ void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "thread " << thread_id_ << " local_batch_id_ " VLOG(3) << "thread " << thread_id_ << " local_batch_id_ "
<< local_batch_id_ << " batch_id_ " << batch_id_; << local_batch_id_ << " batch_id_ " << batch_id_;
if (threads_completed) { if (threads_completed) {
local_completed = true;
VLOG(3) << "thread " << thread_id_ << " completed."; VLOG(3) << "thread " << thread_id_ << " completed.";
lk.unlock(); lk.unlock();
VLOG(0) << "============timeline============"; VLOG(0) << "============timeline============";
for (size_t i = 0; i < ops_.size(); ++i) { for (size_t i = 0; i < ops_.size(); ++i) {
VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i] VLOG(0) << "op: " << op_name[i]
<< ", max_time: " << op_max_time[i]
<< ", min_time: " << op_min_time[i] << ", min_time: " << op_min_time[i]
<< ", mean_time: " << op_total_time[i] / op_count[i]; << ", mean_time: " << op_total_time[i] / op_count[i];
} }
VLOG(0) << "================================"; VLOG(0) << "================================";
threads_completed = false; break;
return;
} }
lk.unlock(); lk.unlock();
real_microbatch_num += 1;
local_batch_id_ += 1; local_batch_id_ += 1;
} }
// forward pass:
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch // We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times. // to avoid increasing the @LR_DECAY_STEP@ multiple times.
...@@ -489,6 +631,8 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -489,6 +631,8 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get()); unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
gettimeofday(&end, NULL);
timeline.Pause(); timeline.Pause();
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
...@@ -500,14 +644,38 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -500,14 +644,38 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::FWD:B[" << local_batch_id_ << "]:SEC["
<< thread_id_ << "]:SCOPE[" << i << "]:OP["
<< op->Type() << "]:START["
<< start.tv_sec * 1e6 + start.tv_usec << "]:END["
<< end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec
<< "]" << std::endl;
} }
}
dev_ctx_->Wait();
// backward pass // backward pass
for (int i = 0; i < num_microbatches_; ++i) { for (int i = 0; i < real_microbatch_num; ++i) {
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) || if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) | op_role == (static_cast<int>(OpRole::kBackward) |
...@@ -520,6 +688,8 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -520,6 +688,8 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get()); unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
gettimeofday(&end, NULL);
timeline.Pause(); timeline.Pause();
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
...@@ -531,13 +701,40 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -531,13 +701,40 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::BWD:B[" << local_batch_id_ << "]:SEC["
<< thread_id_ << "]:SCOPE[" << i << "]:OP["
<< op->Type() << "]:START["
<< start.tv_sec * 1e6 + start.tv_usec << "]:END["
<< end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec
<< "]" << std::endl;
}
}
dev_ctx_->Wait();
if (real_microbatch_num == 0) {
return;
} }
// update pass // update pass
int op_idx = 0; int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) { for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role")); int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) { if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
...@@ -548,6 +745,8 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -548,6 +745,8 @@ void SectionWorker::TrainFilesWithProfiler() {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get()); op.get(), unused_vars_, gc.get());
} }
cudaDeviceSynchronize();
gettimeofday(&end, NULL);
timeline.Pause(); timeline.Pause();
auto time = timeline.ElapsedUS(); auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
...@@ -559,10 +758,34 @@ void SectionWorker::TrainFilesWithProfiler() { ...@@ -559,10 +758,34 @@ void SectionWorker::TrainFilesWithProfiler() {
} }
op_count[op_idx] += 1; op_count[op_idx] += 1;
op_total_time[op_idx] += time; op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:SCOPE[" << num_microbatches_ << "]:OP["
<< op->Type() << "]:START["
<< start.tv_sec * 1e6 + start.tv_usec << "]:END["
<< end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
} }
op_idx++; op_idx++;
} }
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START["
<< micro_start.tv_sec * 1e6 + micro_start.tv_usec << "]:END["
<< micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]"
<< std::endl;
}
dev_ctx_->Wait(); dev_ctx_->Wait();
if (local_completed) {
return;
}
} }
} }
} }
......
...@@ -223,7 +223,6 @@ class PipelineTrainer : public TrainerBase { ...@@ -223,7 +223,6 @@ class PipelineTrainer : public TrainerBase {
int section_num_; int section_num_;
int num_microbatches_; int num_microbatches_;
int start_cpu_core_id_; int start_cpu_core_id_;
std::vector<std::string> feed_var_names_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
std::vector<std::vector<std::string>> skip_vars_; std::vector<std::vector<std::string>> skip_vars_;
TrainerDesc trainer_desc_; TrainerDesc trainer_desc_;
......
...@@ -48,8 +48,9 @@ __all__ = [ ...@@ -48,8 +48,9 @@ __all__ = [
'AdamOptimizer', 'AdamaxOptimizer', 'DpsgdOptimizer', 'AdamOptimizer', 'AdamaxOptimizer', 'DpsgdOptimizer',
'DecayedAdagradOptimizer', 'RMSPropOptimizer', 'FtrlOptimizer', 'Adadelta', 'DecayedAdagradOptimizer', 'RMSPropOptimizer', 'FtrlOptimizer', 'Adadelta',
'AdadeltaOptimizer', 'ModelAverage', 'LarsMomentum', 'AdadeltaOptimizer', 'ModelAverage', 'LarsMomentum',
'LarsMomentumOptimizer', 'LambOptimizer', 'ExponentialMovingAverage', 'LarsMomentumOptimizer', 'DGCMomentumOptimizer', 'LambOptimizer',
'PipelineOptimizer', 'LookaheadOptimizer', 'RecomputeOptimizer' 'ExponentialMovingAverage', 'PipelineOptimizer', 'LookaheadOptimizer',
'RecomputeOptimizer'
] ]
...@@ -3709,15 +3710,9 @@ class PipelineOptimizer(object): ...@@ -3709,15 +3710,9 @@ class PipelineOptimizer(object):
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
batch_size = 1 batch_size = 1
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x,y])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
data_loader.start() data_loader.start()
exe.train_from_dataset( exe.train_from_dataset(
fluid.default_main_program(), fluid.default_main_program())
dataset)
data_loader.reset() data_loader.reset()
""" """
...@@ -3735,7 +3730,7 @@ class PipelineOptimizer(object): ...@@ -3735,7 +3730,7 @@ class PipelineOptimizer(object):
"num_microbatches must be a positive value.") "num_microbatches must be a positive value.")
self._num_microbatches = num_microbatches self._num_microbatches = num_microbatches
assert start_cpu_core_id >= 0, ( assert start_cpu_core_id >= 0, (
"start_cpu_core_id must be greater than or equal to 0.") "start_cpu_core_id must be a non negative integer.")
self._start_cpu_core_id = start_cpu_core_id self._start_cpu_core_id = start_cpu_core_id
self._place_list = None self._place_list = None
op_maker = core.op_proto_and_checker_maker op_maker = core.op_proto_and_checker_maker
...@@ -3743,7 +3738,7 @@ class PipelineOptimizer(object): ...@@ -3743,7 +3738,7 @@ class PipelineOptimizer(object):
self._op_role_key = op_maker.kOpRoleAttrName() self._op_role_key = op_maker.kOpRoleAttrName()
self._op_role_var_key = op_maker.kOpRoleVarAttrName() self._op_role_var_key = op_maker.kOpRoleVarAttrName()
self._op_device_key = op_maker.kOpDeviceAttrName() self._op_device_key = op_maker.kOpDeviceAttrName()
self._param_device_map = dict() self._param_device_map = None
def _create_vars(self, block, main_program): def _create_vars(self, block, main_program):
# Create vars for block, copied from main_program's global block # Create vars for block, copied from main_program's global block
...@@ -3782,9 +3777,10 @@ class PipelineOptimizer(object): ...@@ -3782,9 +3777,10 @@ class PipelineOptimizer(object):
return 'Param' in op.input_names and 'Grad' in op.input_names and ( return 'Param' in op.input_names and 'Grad' in op.input_names and (
"LearningRate" in op.input_names) "LearningRate" in op.input_names)
def _split_program(self, main_program): def _split_program(self, main_program, devices):
""" """
Split a program into sections according to devices that ops run on. Split a program into sections according to devices that ops run on.
The ops of the role LRSched are copied to all sections.
Args: Args:
main_program (Program): the main program main_program (Program): the main program
...@@ -3792,14 +3788,23 @@ class PipelineOptimizer(object): ...@@ -3792,14 +3788,23 @@ class PipelineOptimizer(object):
programs = [] programs = []
# Map from device to its corresponding section program info # Map from device to its corresponding section program info
device_program_map = dict() device_program_map = dict()
block = main_program.block(0) for device in devices:
p = {'program': Program()}
device_program_map[device] = p
block = main_program.block(0)
for op in block.ops: for op in block.ops:
device = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
op_role = op.attr(self._op_role_key)
if device not in device_program_map: if int(op_role) & int(self._op_role.LRSched):
program = {"program": Program()} # Copy ops of the role LRSched to all sections.
device_program_map[device] = program for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, device)
else:
program = device_program_map[device] program = device_program_map[device]
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
...@@ -3833,9 +3838,8 @@ class PipelineOptimizer(object): ...@@ -3833,9 +3838,8 @@ class PipelineOptimizer(object):
for in_var_name in op.input_arg_names: for in_var_name in op.input_arg_names:
if in_var_name == var_name: if in_var_name == var_name:
post_op.append(op) post_op.append(op)
break
if post_op: if post_op:
if not len(post_op) == 1:
raise ValueError("Each op can only have one post op.")
return post_op[0] return post_op[0]
return None return None
...@@ -3890,60 +3894,26 @@ class PipelineOptimizer(object): ...@@ -3890,60 +3894,26 @@ class PipelineOptimizer(object):
def _get_data_var_info(self, block): def _get_data_var_info(self, block):
""" """
Get all vars whose is_data attribute are true and then rename them. Get all vars whose is_data attribute are true and then rename them.
For PipelineTrainer, all data vars are binded to
minibatch scope, so we have to feed them to the microbatch
to avoid conflicts. The vars feeded to microbatch have to
be renamed.
""" """
# A map from var name to the renamed name. # map of data vars to devices that that data on
raw_name_new_name_map = dict()
# Because we will create vars in block, it is more safe
# to get all var_names before iteration.
var_names = list(block.vars.keys())
for var_name in var_names:
var = block.var(var_name)
if not var.is_data:
continue
assert var_name not in raw_name_new_name_map, (
"{} has already been processed.".format(var_name))
new_name = unique_name.generate(var_name)
raw_name_new_name_map[var_name] = new_name
new_var = self._create_var(block, var, new_name)
new_var.is_data = False
# map of data to devices that that data on
data_devices_map = dict() data_devices_map = dict()
for op in block.ops: for op in block.ops:
dev_spec = op.attr(self._op_device_key) dev_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names: for var_name in op.input_arg_names:
if var_name not in raw_name_new_name_map: if "blocking_queue" in var_name: continue
var = block.var(var_name)
if not var.is_data:
continue continue
if not var_name in data_devices_map: if not var_name in data_devices_map:
data_devices_map[var_name] = [] data_devices_map[var_name] = []
if not dev_spec in data_devices_map[var_name]: if not dev_spec in data_devices_map[var_name]:
data_devices_map[var_name].append(dev_spec) data_devices_map[var_name].append(dev_spec)
new_name = raw_name_new_name_map[var_name] return data_devices_map
#self._rename_arg(op, var_name, new_name)
return data_devices_map, raw_name_new_name_map
def _rename_var_in_block(self, block, raw_name_new_name_map):
"""
Rename vars whose names in raw_name_new_name_map to the corresponding
new names.
"""
for op in block.ops:
if op.type == "enqueue" or op.type == "dequeue":
continue
for var_name in op.input_arg_names:
if var_name in raw_name_new_name_map:
new_name = raw_name_new_name_map[var_name]
self._rename_arg(op, var_name, new_name)
def _insert_enq_deq_for_data_var(self, main_block, programs, startup, def _insert_enq_deq_for_data_var(self, main_block, programs, startup,
devices): devices):
""" """
Insert enqueue and dequeue ops for data var Insert enqueue and dequeue ops for data var that on other devices.
Args: Args:
main_block (Block): Global block for main program main_block (Block): Global block for main program
...@@ -3952,22 +3922,19 @@ class PipelineOptimizer(object): ...@@ -3952,22 +3922,19 @@ class PipelineOptimizer(object):
devices (list): List of devices in the format (dev:dev_index) devices (list): List of devices in the format (dev:dev_index)
""" """
main_program = main_block.program main_program = main_block.program
data_devices_map, raw_name_new_name_map = self._get_data_var_info( data_devices_map = self._get_data_var_info(main_block)
main_block)
first_prog = programs[0]['program'] first_prog = programs[0]['program']
first_block = first_prog.block(0) first_block = first_prog.block(0)
enqueue_index = 0 enqueue_index = 0
if first_block.ops[0].type == "create_py_reader" or (
first_block.ops[1].type == "create_py_reader"):
for op in first_block.ops: for op in first_block.ops:
if op.type == "read":
enqueue_index += 1 enqueue_index += 1
if op.type == "read":
break break
enqueue_index += 1
first_dev_spec = devices[0] first_dev_spec = devices[0]
for var_name in data_devices_map.keys(): for var_name in data_devices_map.keys():
for device in data_devices_map[var_name]: for device in data_devices_map[var_name]:
if device == first_dev_spec: continue
# step1: generate queue for each pair of data var and device # step1: generate queue for each pair of data var and device
# that that data on # that that data on
queue_name = var_name + "_blocking_queue" queue_name = var_name + "_blocking_queue"
...@@ -4001,13 +3968,10 @@ class PipelineOptimizer(object): ...@@ -4001,13 +3968,10 @@ class PipelineOptimizer(object):
prog = programs[prog_index]['program'] prog = programs[prog_index]['program']
block = prog.block(0) block = prog.block(0)
index = 0 index = 0
if device == first_dev_spec:
index = enqueue_index + 1
new_name = raw_name_new_name_map[var_name]
source_var = main_program.block(0).var(var_name) source_var = main_program.block(0).var(var_name)
new_var = self._create_var(block, source_var, new_name) new_var = self._create_var(block, source_var, var_name)
block._insert_op( block._insert_op(
index=index, index=0,
type='dequeue', type='dequeue',
outputs={'Out': [new_var]}, outputs={'Out': [new_var]},
attrs={ attrs={
...@@ -4015,7 +3979,6 @@ class PipelineOptimizer(object): ...@@ -4015,7 +3979,6 @@ class PipelineOptimizer(object):
self._op_role_key: self._op_role.Forward, self._op_role_key: self._op_role.Forward,
'queue_name': queue_name, 'queue_name': queue_name,
}) })
self._rename_var_in_block(block, raw_name_new_name_map)
def _strip_grad_suffix(self, name): def _strip_grad_suffix(self, name):
""" """
...@@ -4030,18 +3993,6 @@ class PipelineOptimizer(object): ...@@ -4030,18 +3993,6 @@ class PipelineOptimizer(object):
""" """
return name + core.grad_var_suffix() return name + core.grad_var_suffix()
def _update_param_device_map(self, params_grads, block):
for param_grad in params_grads:
if not param_grad[0].trainable: continue
param_name = param_grad[0].name
ops = block.ops
for op in ops:
input_arg_names = op.input_arg_names
if param_name in input_arg_names:
self._param_device_map[param_name] = op.attr(
self._op_device_key)
break
def _add_opdevice_attr_for_regularization_clip(self, block): def _add_opdevice_attr_for_regularization_clip(self, block):
""" """
Add op_device attribute for regulization and clip ops. Add op_device attribute for regulization and clip ops.
...@@ -4056,7 +4007,7 @@ class PipelineOptimizer(object): ...@@ -4056,7 +4007,7 @@ class PipelineOptimizer(object):
assert self._op_role_var_key in op.attr_names assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key] op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2 assert len(op_role_var) == 2
param_name = block.vars[op_role_var[0]].name param_name = op_role_var[0]
device = self._param_device_map[param_name] device = self._param_device_map[param_name]
op._set_attr(self._op_device_key, device) op._set_attr(self._op_device_key, device)
...@@ -4125,6 +4076,8 @@ class PipelineOptimizer(object): ...@@ -4125,6 +4076,8 @@ class PipelineOptimizer(object):
"{} has not been set.".format(op.type)) "{} has not been set.".format(op.type))
if not dev_spec in device_specs: if not dev_spec in device_specs:
device_specs.append(dev_spec) device_specs.append(dev_spec)
sorted_device_specs = sorted(device_specs)
assert sorted_device_specs == device_specs
return device_specs return device_specs
def _insert_enq_deq_ops_for_boundaries(self, block, origin_block, def _insert_enq_deq_ops_for_boundaries(self, block, origin_block,
...@@ -4141,6 +4094,11 @@ class PipelineOptimizer(object): ...@@ -4141,6 +4094,11 @@ class PipelineOptimizer(object):
var_devspec = dict() var_devspec = dict()
for index, op in list(enumerate(origin_block.ops)): for index, op in list(enumerate(origin_block.ops)):
# skips lr-related op and vars, as we will process them later.
if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched):
continue
if self._is_update_op(op): continue
cur_device_spec = op.attr(self._op_device_key) cur_device_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names: for var_name in op.input_arg_names:
# i.e., lod_tensor_blocking_queue created by DataLoader, # i.e., lod_tensor_blocking_queue created by DataLoader,
...@@ -4196,82 +4154,32 @@ class PipelineOptimizer(object): ...@@ -4196,82 +4154,32 @@ class PipelineOptimizer(object):
}) })
extra_index += 1 extra_index += 1
def _add_dequeue_ops_for_optimize(self, block, startup_program): def _clear_gradients(self, main_block):
startup_block = startup_program.global_block() """
grad_queue_map = dict() Clear gradients at the begining of each run of a minibatch.
grad_device_map = dict() """
optimize_index = None for param_name in self._param_device_map:
grad_names_to_dequeue = [] grad_name = self._append_grad_suffix(param_name)
param_var = main_block.vars[param_name]
for index, op in reversed(list(enumerate(block.ops))): grad_var = main_block.vars[grad_name]
device = op.attr(self._op_device_key) device = self._param_device_map[param_name]
# Optimizer pass main_block._insert_op(
if not self._is_optimize_op(op): index=0,
optimize_index = index + 1 type='fill_constant',
break inputs={},
if not self._is_update_op(op): continue outputs={'Out': [grad_var]},
assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2
grad_name = op_role_var[1]
assert grad_name not in grad_device_map
assert grad_name not in grad_names_to_dequeue
grad_device_map[grad_name] = device
grad_names_to_dequeue.append(grad_name)
for grad_name in grad_names_to_dequeue:
device = grad_device_map[grad_name]
grad_names = []
grads = []
queue_name = grad_name + "_blocking_queue"
queue_name = unique_name.generate(queue_name)
grad_queue_map[grad_name] = queue_name
ref_var = block.vars[grad_name]
queue_var = startup_block.create_var(
name=queue_name,
persistable=True,
type=core.VarDesc.VarType.RAW)
startup_block.append_op(
type='queue_generator',
attrs={
'names': [queue_name],
'capacity': self._num_microbatches
})
orig_var_name = self._strip_grad_suffix(grad_name)
for _ in range(self._num_microbatches):
u_name = unique_name.generate(orig_var_name)
u_grad_name = self._append_grad_suffix(u_name)
grad_var = self._create_var(block, ref_var, u_grad_name)
grad_names.append(u_grad_name)
grads.append(grad_var)
block._insert_op(
index=optimize_index,
type='dequeue',
outputs={'Out': grads},
attrs={
self._op_device_key: device,
'queue_name': queue_name,
self._op_role_key: self._op_role.Optimize
})
block._insert_op(
index=optimize_index + 1,
type='sum',
inputs={'X': grad_names},
outputs={'Out': ref_var},
attrs={ attrs={
'shape': grad_var.shape,
'dtype': grad_var.dtype,
'value': float(0),
self._op_device_key: device, self._op_device_key: device,
self._op_role_key: self._op_role.Optimize self._op_role_key: self._op_role.Optimize.LRSched,
}) })
return grad_queue_map
def _insert_enq_deq_ops_for_update(self, block, startup_program): def _accumulate_gradients(self, block):
""" """
Insert enqueue and dequeue ops for gradients of parameters. Accumulate the graident generated in microbatch to the one in mini-batch.
""" """
startup_block = startup_program.global_block()
grad_queue_map = self._add_dequeue_ops_for_optimize(block,
startup_program)
for index, op in reversed(list(enumerate(block.ops))): for index, op in reversed(list(enumerate(block.ops))):
offset = index offset = index
device = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
...@@ -4298,19 +4206,25 @@ class PipelineOptimizer(object): ...@@ -4298,19 +4206,25 @@ class PipelineOptimizer(object):
if len(op_role_var) == 0: if len(op_role_var) == 0:
continue continue
assert len(op_role_var) % 2 == 0 assert len(op_role_var) % 2 == 0
offset = index
for i in range(0, len(op_role_var), 2): for i in range(0, len(op_role_var), 2):
grad_name = op_role_var[i + 1] grad_name = op_role_var[i + 1]
grad_var = block.vars[grad_name] grad_var = block.vars[grad_name]
assert grad_name in grad_queue_map param_name = op_role_var[i]
queue_name = grad_queue_map[grad_name] param_var = block.vars[param_name]
new_var_name = unique_name.generate(param_name)
new_var_name = self._append_grad_suffix(new_var_name)
new_var = self._create_var(block, grad_var, new_var_name)
self._rename_arg(op, grad_name, new_var_name)
block._insert_op( block._insert_op(
index=offset + 1, index=offset + 1,
type='enqueue', type='sum',
inputs={'X': block.vars[grad_name]}, inputs={'X': [grad_var, new_var]},
outputs={'Out': grad_var},
attrs={ attrs={
'queue_name': queue_name,
self._op_device_key: device, self._op_device_key: device,
self._op_role_key: self._op_role.Backward self._op_role_key: self._op_role.Backward,
self._op_role_var_key: op_role_var
}) })
offset += 1 offset += 1
...@@ -4333,6 +4247,7 @@ class PipelineOptimizer(object): ...@@ -4333,6 +4247,7 @@ class PipelineOptimizer(object):
def _get_device_info(self, block): def _get_device_info(self, block):
for op in block.ops: for op in block.ops:
if not op._has_kernel(op.type): continue if not op._has_kernel(op.type): continue
op_device = op.attr(self._op_device_key) op_device = op.attr(self._op_device_key)
return op_device return op_device
...@@ -4438,14 +4353,16 @@ class PipelineOptimizer(object): ...@@ -4438,14 +4353,16 @@ class PipelineOptimizer(object):
startup_program = default_startup_program() startup_program = default_startup_program()
optimize_ops, params_grads = self._optimizer.minimize( optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set) loss, startup_program, parameter_list, no_grad_set)
self._update_param_device_map(params_grads, main_block) self._param_device_map = self._optimizer._param_device_map
# Step1: add default op_device attribute for regulization and clip ops # Step1: add default op_device attribute for regulization and clip ops
self._add_opdevice_attr_for_regularization_clip(main_block) self._add_opdevice_attr_for_regularization_clip(main_block)
# Step2: add default op_device attribute for ops whose op_device # Step2: add default op_device attribute for ops whose op_device
# attribute have not been set yet. # attribute have not been set yet. Then check all ops have the
# op_device attribute.
self._add_default_opdevice_attr(main_block) self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block) device_specs = self._check_validation(main_block)
# Step3: add enqueue and dequeue ops between section boundaries # Step3: add enqueue and dequeue ops between section boundaries
...@@ -4454,8 +4371,10 @@ class PipelineOptimizer(object): ...@@ -4454,8 +4371,10 @@ class PipelineOptimizer(object):
self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block, self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block,
startup_program) startup_program)
# Step4: add a pair of enqueue and dequeueN for parameter gradients # Step4: accumulate gradients during backward
self._insert_enq_deq_ops_for_update(main_block, startup_program) # and clear them after update
self._clear_gradients(main_block)
self._accumulate_gradients(main_block)
main_program = main_block.program main_program = main_block.program
...@@ -4474,16 +4393,9 @@ class PipelineOptimizer(object): ...@@ -4474,16 +4393,9 @@ class PipelineOptimizer(object):
# Step5: split program into sections and add pairs of # Step5: split program into sections and add pairs of
# enqueue and dequeue ops for data var. # enqueue and dequeue ops for data var.
if len(place_list) == 0: if len(place_list) <= 1:
program_list = [] raise ValueError("Run on one device, do not use pipeline.")
ptmp = { program_list = self._split_program(main_program, device_specs)
"program": main_program,
"input_set": set(),
"output_set": set()
}
program_list.append(ptmp)
else:
program_list = self._split_program(main_program)
for p in program_list: for p in program_list:
self._create_vars(p["program"].block(0), main_program) self._create_vars(p["program"].block(0), main_program)
self._insert_enq_deq_for_data_var(main_block, program_list, self._insert_enq_deq_for_data_var(main_block, program_list,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册