From a6344af22e323b4e9f8fdda1ea7705102979d59b Mon Sep 17 00:00:00 2001 From: sandyhouse Date: Thu, 10 Sep 2020 08:41:19 +0000 Subject: [PATCH] update, test=develop --- paddle/fluid/framework/device_worker.h | 17 +- paddle/fluid/framework/pipeline_trainer.cc | 371 ++++++---- paddle/fluid/framework/section_worker.cc | 780 ++++++--------------- paddle/fluid/framework/trainer.h | 34 +- paddle/fluid/framework/trainer_desc.proto | 2 +- python/paddle/fluid/optimizer.py | 112 +-- 6 files changed, 501 insertions(+), 815 deletions(-) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 3336b5783a8..49908be40e7 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -414,7 +414,8 @@ class HeterCpuWorker : public HogwildWorker { #if defined(PADDLE_WITH_NCCL) class SectionWorker : public DeviceWorker { public: - SectionWorker() { local_batch_id_ = 0; } + // SectionWorker() { local_batch_id_ = 0; } + SectionWorker() {} ~SectionWorker() override {} void Initialize(const TrainerDesc& desc) override; @@ -429,7 +430,7 @@ class SectionWorker : public DeviceWorker { const platform::Place& place() const { return place_; } - void SetSectionIndex(int section_id) { section_id_ = section_id; } + // void SetSectionIndex(int section_id) { section_id_ = section_id; } void SetDeviceIndex(int tid) override {} void SetThreadIndex(int thread_id) { thread_id_ = thread_id; } void SetMicrobatchNum(int num) { num_microbatches_ = num; } @@ -440,7 +441,7 @@ class SectionWorker : public DeviceWorker { void SetSkipVars(const std::vector& skip_vars) { skip_vars_ = skip_vars; } - static void ResetBatchId() { batch_id_ = 0; } + // static void ResetBatchId() { batch_id_ = 0; } static std::atomic cpu_id_; @@ -454,13 +455,13 @@ class SectionWorker : public DeviceWorker { const Scope* minibatch_scope_; std::vector> ops_; - static std::mutex thread_mutex; - static std::mutex cout_mutex; - static std::condition_variable thread_condition; - static bool threads_completed; + // static std::mutex thread_mutex; + // static std::mutex cout_mutex; + // static std::condition_variable thread_condition; + // static bool threads_completed; std::shared_ptr program_; static uint64_t batch_id_; - uint64_t local_batch_id_; + // uint64_t local_batch_id_; platform::DeviceContext* dev_ctx_ = nullptr; }; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index b827435508f..8c6c3157437 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -27,73 +27,88 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, const auto& section_params = trainer_desc.section_param(); num_microbatches_ = section_params.num_microbatches(); VLOG(3) << "Number of microbatches per minibatch: " << num_microbatches_; - section_num_ = section_params.section_config_size(); - VLOG(3) << "Number of program sections: " << section_num_; trainer_desc_ = trainer_desc; start_cpu_core_id_ = section_params.start_cpu_core_id(); - SetDataset(dataset); + // SetDataset(dataset); ParseDumpConfig(trainer_desc); // get filelist from trainer_desc here - const std::vector readers = - dataset->GetReaders(); - VLOG(3) << "readers num: " << readers.size(); - int num_readers = readers.size(); - PADDLE_ENFORCE_EQ(num_readers, 1, - platform::errors::InvalidArgument( - "Number of dataset readers for pipeline " - "must be 1 now, but the value you give is %d.", - num_readers)); - auto* reader = readers[0]; + // const std::vector readers = + // VLOG(3) << "Number of program sections: " << section_num_; + // dataset->GetReaders(); + // VLOG(3) << "readers num: " << readers.size(); + // int num_readers = readers.size(); + // PADDLE_ENFORCE_EQ(num_readers, 1, + // platform::errors::InvalidArgument( + // "Number of dataset readers for pipeline " + // "must be 1 now, but the value you give is %d.", + // num_readers)); + // auto* reader = readers[0]; - workers_.resize(section_num_); - for (int i = 0; i < section_num_; ++i) { - const auto& section_config = section_params.section_config(i); - platform::Place place; - int place_id = section_config.place_id(); - switch (section_config.place()) { - case SectionConfig::CPUPlace: - place = platform::CPUPlace(); - break; - case SectionConfig::CUDAPlace: - // Note that one section has at most one GPU place in one pipeline - PADDLE_ENFORCE_GE( - place_id, 0, - platform::errors::InvalidArgument( - "The place_id value for CUDAPlace shoud be greater " - "than or equal to 0, but the value you give is %d.", - place_id)); - place = platform::CUDAPlace(place_id); - break; - case SectionConfig::CUDAPinnedPlace: - place = platform::CUDAPinnedPlace(); - break; - default: - PADDLE_ENFORCE_NOT_NULL(nullptr, - platform::errors::InvalidArgument( - "Unkown place type in SectionConfig: %d", - section_config.place())); - } - places_.emplace_back(place); - VLOG(3) << "Device worker place: " << place << ", device id: " << place_id - << ", section: " << i; + // workers_.resize(section_num_); + // for (int i = 0; i < section_num_; ++i) { + // const auto& section_config = section_params.section_config(i); + // platform::Place place; + // int place_id = section_config.place_id(); + // switch (section_config.place()) { + // case SectionConfig::CPUPlace: + // place = platform::CPUPlace(); + // break; + // case SectionConfig::CUDAPlace: + // // Note that one section has at most one GPU place in one pipeline + // PADDLE_ENFORCE_GE( + // place_id, 0, + // platform::errors::InvalidArgument( + // "The place_id value for CUDAPlace shoud be greater " + // "than or equal to 0, but the value you give is %d.", + // place_id)); + // place = platform::CUDAPlace(place_id); + // break; + // case SectionConfig::CUDAPinnedPlace: + // place = platform::CUDAPinnedPlace(); + // break; + // default: + // PADDLE_ENFORCE_NOT_NULL(nullptr, + // platform::errors::InvalidArgument( + // "Unkown place type in SectionConfig: %d", + // section_config.place())); + // } + // places_.emplace_back(place); + // VLOG(3) << "Device worker place: " << place << ", device id: " << place_id + // << ", section: " << i; + + // workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( + // trainer_desc.device_worker_name()); + // auto this_worker = + // std::dynamic_pointer_cast( + // workers_[i]); + // if (i == 0) { + // // we only set reader for the first section + // this_worker->SetDataFeed(reader); + // this_worker->SetReaderPlace(place); + // } + // this_worker->SetThreadIndex(i); + // this_worker->SetSectionIndex(i); + // this_worker->SetPlace(place); + // this_worker->Initialize(trainer_desc); + // this_worker->SetMicrobatchNum(num_microbatches_); + //} + const auto& section_config = section_params.section_config(); + int place_id = section_config.place_id(); + PADDLE_ENFORCE_GE(place_id, 0, + platform::errors::InvalidArgument( + "The place_id value for CUDAPlace shoud be " + "non-negative, but the value given is %d.", + place_id)); + place_ = platform::CUDAPlace(place_id); + worker_ = DeviceWorkerFactory::CreateDeviceWorker( + trainer_desc.device_worker_name()); + auto this_worker = + std::dynamic_pointer_cast(worker_); + this_worker->SetPlace(place_); + this_worker->Initialize(trainer_desc); + this_worker->SetMicrobatchNum(num_microbatches_); - workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( - trainer_desc.device_worker_name()); - auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); - if (i == 0) { - // we only set reader for the first section - this_worker->SetDataFeed(reader); - this_worker->SetReaderPlace(place); - } - this_worker->SetThreadIndex(i); - this_worker->SetSectionIndex(i); - this_worker->SetPlace(place); - this_worker->Initialize(trainer_desc); - this_worker->SetMicrobatchNum(num_microbatches_); - } // set debug here SetDebug(trainer_desc.debug()); } @@ -119,7 +134,52 @@ void PipelineTrainer::InitDumpEnv() { } } -void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, +// void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, +// const ProgramDesc& program, +// const platform::Place& place) { +// auto& global_block = program.Block(0); +// std::map param_map; +// for (auto& var : global_block.AllVars()) { +// if (var->Persistable()) { +// param_map[var->Name()] = 1; +// } +// } +// for (auto& var : global_block.AllVars()) { +// bool is_param_grad = false; +// size_t pos = 0; +// if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) { +// auto prefix_name = var->Name().substr(0, pos); +// if (param_map.find(prefix_name) != param_map.end()) { +// is_param_grad = true; +// } +// } +// VLOG(3) << "Var name: " << var->Name(); +// if ((var->Persistable() || is_param_grad) && microbatch_id == 0) { +// auto* ptr = root_scope_->FindVar(var->Name()); +// auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); +// VLOG(3) << "Create persistable var " << var->Name() << " for minibatch +// " +// << section_id << ", which pointer is " << new_ptr; +// InitializeVariable(new_ptr, var->GetType()); +// if (is_param_grad) { +// continue; +// } +// const LoDTensor& root_tensor = ptr->Get(); +// LoDTensor* minibatch_tensor = new_ptr->GetMutable(); +// TensorCopy(*static_cast(&root_tensor), place, +// static_cast(minibatch_tensor)); +// } else if (!var->Persistable() && !is_param_grad) { +// auto* ptr = +// microbatch_scopes_[section_id][microbatch_id]->Var(var->Name()); +// VLOG(3) << "Create variable " << var->Name() << " for section " +// << section_id << " microbatch " << microbatch_id +// << ", which pointer is " << ptr; +// InitializeVariable(ptr, var->GetType()); +// } +// } +// } + +void PipelineTrainer::CopyParameters(int microbatch_id, const ProgramDesc& program, const platform::Place& place) { auto& global_block = program.Block(0); @@ -139,45 +199,57 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, } } VLOG(3) << "Var name: " << var->Name(); - if ((var->Persistable() || is_param_grad) && microbatch_id == 0) { - auto* ptr = root_scope_->FindVar(var->Name()); - auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); - VLOG(3) << "Create persistable var " << var->Name() << " for minibatch " - << section_id << ", which pointer is " << new_ptr; - InitializeVariable(new_ptr, var->GetType()); - if (is_param_grad) { - continue; - } - const LoDTensor& root_tensor = ptr->Get(); - LoDTensor* minibatch_tensor = new_ptr->GetMutable(); - TensorCopy(*static_cast(&root_tensor), place, - static_cast(minibatch_tensor)); + if (is_param_grad && microbatch_id == 0) { + auto* ptr = minibatch_scope_->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + VLOG(3) << "Create grad for persistable var: " << var->Name() + << ", which pointer is " << ptr; } else if (!var->Persistable() && !is_param_grad) { - auto* ptr = - microbatch_scopes_[section_id][microbatch_id]->Var(var->Name()); - VLOG(3) << "Create variable " << var->Name() << " for section " - << section_id << " microbatch " << microbatch_id + auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name()); + VLOG(3) << "Create variable " << var->Name() << " microbatch " << ", which pointer is " << ptr; InitializeVariable(ptr, var->GetType()); } } } -void PipelineTrainer::GetSkipVars(int section_id, const ProgramDesc& program) { +// void PipelineTrainer::GetSkipVars(int section_id, const ProgramDesc& program) +// { +// auto& global_block = program.Block(0); +// for (auto& op : global_block.AllOps()) { +// if (op->Type() != "enqueue") { +// continue; +// } +// auto input_arg_names = op->InputArgumentNames(); +// PADDLE_ENFORCE_EQ(input_arg_names.size(), 1, +// platform::errors::InvalidArgument( +// "Number of input arguments for enqueue op must be +// 1, " +// "but the value is %d.", +// input_arg_names.size())); +// std::string input_arg_name = input_arg_names[0]; +// if (input_arg_name.rfind("@GRAD") != input_arg_name.size() - 5) { +// skip_vars_[section_id].emplace_back(input_arg_name); +// VLOG(3) << "add skip var name: " << input_arg_name; +// } +// } +// } + +void PipelineTrainer::GetSkipVars(const ProgramDesc& program) { auto& global_block = program.Block(0); for (auto& op : global_block.AllOps()) { - if (op->Type() != "enqueue") { + if (op->Type() != "c_send") { continue; } auto input_arg_names = op->InputArgumentNames(); PADDLE_ENFORCE_EQ(input_arg_names.size(), 1, platform::errors::InvalidArgument( - "Number of input arguments for enqueue op must be 1, " - "but the value is %d.", + "Number of input arguments for c_send op must be 1, " + "but the value given is %d.", input_arg_names.size())); std::string input_arg_name = input_arg_names[0]; if (input_arg_name.rfind("@GRAD") != input_arg_name.size() - 5) { - skip_vars_[section_id].emplace_back(input_arg_name); + skip_vars_.emplace_back(input_arg_name); VLOG(3) << "add skip var name: " << input_arg_name; } } @@ -185,86 +257,101 @@ void PipelineTrainer::GetSkipVars(int section_id, const ProgramDesc& program) { void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, const platform::Place& place) { - PADDLE_ENFORCE_NOT_NULL(root_scope_, - platform::errors::InvalidArgument( - "root_scope pointer can not be nullptr")); + PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( + "root_scope_ can not be nullptr")); auto start_cpu_id = trainer_desc_.section_param().start_cpu_core_id(); SectionWorker::cpu_id_.store(start_cpu_id); - minibatch_scopes_.resize(section_num_); - microbatch_scopes_.resize(section_num_); - skip_vars_.resize(section_num_); + // minibatch_scopes_.resize(section_num_); + // microbatch_scopes_.resize(section_num_); + // minibatch_scopes_.resize(1); + microbatch_scopes_.resize(num_microbatches_); + // skip_vars_.resize(section_num_); VLOG(3) << "Init ScopeQueues and create all scopes"; - for (int i = 0; i < section_num_; ++i) { - minibatch_scopes_[i] = &root_scope_->NewScope(); - std::shared_ptr program; - program.reset(new ProgramDesc( - trainer_desc_.section_param().section_config(i).program_desc())); - microbatch_scopes_[i].resize(num_microbatches_); - for (int j = 0; j < num_microbatches_; ++j) { - microbatch_scopes_[i][j] = &minibatch_scopes_[i]->NewScope(); - CopyParameters(i, j, *program, places_[i]); - } - GetSkipVars(i, *program); + // for (int i = 0; i < section_num_; ++i) { + minibatch_scope_ = &root_scope_->NewScope(); + std::shared_ptr program; + program.reset(new ProgramDesc( + trainer_desc_.section_param().section_config().program_desc())); + // trainer_desc_.section_param().section_config(i).program_desc())); + // microbatch_scopes_[i].resize(num_microbatches_); + for (int j = 0; j < num_microbatches_; ++j) { + // microbatch_scopes_[j] = &minibatch_scopes_[i]->NewScope(); + microbatch_scopes_[j] = &minibatch_scope_->NewScope(); + // CopyParameters(i, j, *program, places_[i]); + CopyParameters(j, *program, place_); } + // GetSkipVars(i, *program); + GetSkipVars(*program); + // } - for (int i = 0; i < section_num_; ++i) { - auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); - this_worker->SetRootScope(root_scope_); - this_worker->SetMinibatchScope(minibatch_scopes_[i]); - this_worker->SetMicrobatchScopes(microbatch_scopes_[i]); - this_worker->SetSkipVars(skip_vars_[i]); - } + // for (int i = 0; i < section_num_; ++i) { + auto this_worker = + std::dynamic_pointer_cast(worker_); + // workers_[i]); + this_worker->SetRootScope(root_scope_); + this_worker->SetMinibatchScope(minibatch_scope_); + // this_worker->SetMicrobatchScopes(microbatch_scopes_[i]); + this_worker->SetMicrobatchScopes(microbatch_scopes_); + // this_worker->SetSkipVars(skip_vars_[i]); + //} } void PipelineTrainer::Run() { VLOG(3) << "Going to run"; - for (int i = 0; i < section_num_; ++i) { - if (!debug_) { - section_threads_.push_back( - std::thread(&DeviceWorker::TrainFiles, workers_[i].get())); - } else { - section_threads_.push_back(std::thread( - &DeviceWorker::TrainFilesWithProfiler, workers_[i].get())); - } + // for (int i = 0; i < section_num_; ++i) { + if (!debug_) { + section_thread_ = std::thread(&DeviceWorker::TrainFiles, worker_.get()); + // section_threads_.push_back( + // std::thread(&DeviceWorker::TrainFiles, workers_.get())); + // std::thread(&DeviceWorker::TrainFiles, workers_[i].get())); + } else { + section_thread_ = + std::thread(&DeviceWorker::TrainFilesWithProfiler, worker_.get()); + // section_threads_.push_back(std::thread( + // &DeviceWorker::TrainFilesWithProfiler, workers_.get())); + // &DeviceWorker::TrainFilesWithProfiler, workers_[i].get())); } + //} } void PipelineTrainer::Finalize() { - for (auto& th : section_threads_) { - th.join(); - } + // for (auto& th : section_threads_) { + // th.join(); + //} + section_thread_.join(); if (need_dump_field_) { FinalizeDumpEnv(); } - VLOG(3) << "copying back parameters. "; - for (int i = 0; i < section_num_; ++i) { - std::shared_ptr program; - program.reset(new ProgramDesc( - trainer_desc_.section_param().section_config(i).program_desc())); - for (int j = 0; j < num_microbatches_; ++j) { - auto& global_block = program->Block(0); - for (auto& var : global_block.AllVars()) { - if (var->Persistable()) { - auto* ptr = root_scope_->FindVar(var->Name()); - LoDTensor* root_tensor = ptr->GetMutable(); - auto* minibatch_ptr = minibatch_scopes_[i]->Var(var->Name()); - const LoDTensor& minibatch_tensor = minibatch_ptr->Get(); - TensorCopy(*static_cast(&minibatch_tensor), places_[0], - static_cast(root_tensor)); - VLOG(3) << "Copy persitable var " << var->Name() << " to root scope"; - } - } - } - } + // VLOG(3) << "copying back parameters. "; + // for (int i = 0; i < section_num_; ++i) { + // std::shared_ptr program; + // program.reset(new ProgramDesc( + // trainer_desc_.section_param().section_config(i).program_desc())); + // for (int j = 0; j < num_microbatches_; ++j) { + // auto& global_block = program->Block(0); + // for (auto& var : global_block.AllVars()) { + // if (var->Persistable()) { + // auto* ptr = root_scope_->FindVar(var->Name()); + // LoDTensor* root_tensor = ptr->GetMutable(); + // auto* minibatch_ptr = minibatch_scopes_[i]->Var(var->Name()); + // const LoDTensor& minibatch_tensor = + // minibatch_ptr->Get(); + // TensorCopy(*static_cast(&minibatch_tensor), + // places_[0], + // static_cast(root_tensor)); + // VLOG(3) << "Copy persitable var " << var->Name() << " to root + // scope"; + // } + // } + // } + // } root_scope_->DropKids(); - SectionWorker::ResetBatchId(); + // SectionWorker::ResetBatchId(); } Scope* PipelineTrainer::GetWorkerScope(int thread_id) { - return microbatch_scopes_[thread_id][0]; + return microbatch_scopes_[0]; } } // end namespace framework diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index 068ed73759e..29beffdbd58 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -31,16 +31,17 @@ namespace paddle { namespace framework { std::atomic SectionWorker::cpu_id_(0); -std::mutex SectionWorker::thread_mutex; -std::mutex SectionWorker::cout_mutex; -std::condition_variable SectionWorker::thread_condition; -bool SectionWorker::threads_completed = false; +// std::mutex SectionWorker::thread_mutex; +// std::mutex SectionWorker::cout_mutex; +// std::condition_variable SectionWorker::thread_condition; +// bool SectionWorker::threads_completed = false; uint64_t SectionWorker::batch_id_(0); void SectionWorker::Initialize(const TrainerDesc& desc) { dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); - program_.reset(new ProgramDesc( - desc.section_param().section_config(section_id_).program_desc())); + program_.reset( + new ProgramDesc(desc.section_param().section_config().program_desc())); + // desc.section_param().section_config(section_id_).program_desc())); for (auto& op_desc : program_->Block(0).AllOps()) { ops_.push_back(OpRegistry::CreateOp(*op_desc)); } @@ -82,7 +83,7 @@ void SectionWorker::AutoSetCPUAffinity(bool reuse) { void SectionWorker::TrainFiles() { VLOG(3) << "begin section_worker TrainFiles"; - AutoSetCPUAffinity(true); + // AutoSetCPUAffinity(true); int64_t max_memory_size = 0; std::unique_ptr gc; @@ -106,201 +107,86 @@ void SectionWorker::TrainFiles() { platform::Timer batch_timer; - if (thread_id_ == 0) { - while (true) { - // Start a minibatch. - // real number of microbatches run - int real_microbatch_num = 0; - batch_timer.Start(); - for (int i = 0; i < num_microbatches_; ++i) { - try { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } catch (platform::EOFException&) { - std::unique_lock lk(thread_mutex); - threads_completed = true; - VLOG(3) << "thread " << thread_id_ << " completed."; - VLOG(3) << "called notify all"; - thread_condition.notify_all(); - VLOG(0) << "EOF encountered"; - break; - } - { - real_microbatch_num += 1; - batch_id_ += 1; - VLOG(3) << "called notify all"; - std::unique_lock lk(thread_mutex); - thread_condition.notify_all(); - } - } - dev_ctx_->Wait(); - - VLOG(0) << "real_microbatch_num for thread 0 " << real_microbatch_num; - // backward pass - for (int i = 0; i < real_microbatch_num; ++i) { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } - dev_ctx_->Wait(); - if (real_microbatch_num == 0) { - batch_timer.Pause(); - VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); - return; - } - // update pass + // if (thread_id_ == 0) { + // while (true) { + // Start a minibatch. + // real number of microbatches run + // int real_microbatch_num = 0; + batch_timer.Start(); + for (int i = 0; i < num_microbatches_; ++i) { + try { for (auto& op : ops_) { int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - op->Run(*microbatch_scopes_[0], place_); + // We run op with op_role = kLRSched only for the first microbatch + // to avoid increasing the @LR_DECAY_STEP@ multiple times. + bool run_first_mbatch = op_role == static_cast(OpRole::kForward) || + op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss)) || + op_role == static_cast(OpRole::kLRSched); + bool run_others = op_role == static_cast(OpRole::kForward) || + op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss)); + if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { + VLOG(3) << "running an op " << op->Type() << " for scope " << i; + op->Run(*microbatch_scopes_[i], place_); if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } } - dev_ctx_->Wait(); - batch_timer.Pause(); - VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); - { - std::unique_lock lk(thread_mutex); - if (threads_completed) { - return; - } - } + } catch (platform::EOFException&) { + // std::unique_lock lk(thread_mutex); + // threads_completed = true; + VLOG(3) << "thread completed."; + // VLOG(3) << "called notify all"; + // thread_condition.notify_all(); + VLOG(0) << "EOF encountered"; + break; } - } else { - while (true) { - // forward pass: - bool local_completed = false; - int real_microbatch_num = 0; - for (int i = 0; i < num_microbatches_; ++i) { - { - PADDLE_ENFORCE_LE( - local_batch_id_, batch_id_, - platform::errors::InvalidArgument( - "local_batch_id_ (%d) must be less than or equal to " - "batch_id_ (%d)", - local_batch_id_, batch_id_)); - std::unique_lock lk(thread_mutex); - if (local_batch_id_ == batch_id_ && !threads_completed) { - thread_condition.wait(lk); - } - VLOG(3) << "thread " << thread_id_ << " local_batch_id_ " - << local_batch_id_ << " batch_id_ " << batch_id_; - if (threads_completed) { - VLOG(3) << "thread " << thread_id_ << " completed."; - lk.unlock(); - threads_completed = false; - local_completed = true; - break; - } - lk.unlock(); - local_batch_id_ += 1; - real_microbatch_num += 1; - } - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } - dev_ctx_->Wait(); - // backward pass - for (int i = 0; i < real_microbatch_num; ++i) { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } + } + dev_ctx_->Wait(); + + // backward pass + for (int i = 0; i < num_microbatches_; ++i) { + for (auto& op : ops_) { + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kBackward) || + op_role == (static_cast(OpRole::kBackward) | + static_cast(OpRole::kLoss))) { + VLOG(3) << "running an op " << op->Type() << " for scope " << i; + op->Run(*microbatch_scopes_[i], place_); + if (gc) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } - dev_ctx_->Wait(); - // update pass - if (real_microbatch_num == 0) { - return; - } - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } + } + } + dev_ctx_->Wait(); + // update pass + for (auto& op : ops_) { + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kOptimize)) { + VLOG(3) << "running an op " << op->Type() << " for minibatch scope"; + op->Run(*microbatch_scopes_[0], place_); + if (gc) { + for (int i = 0; i < num_microbatches_; ++i) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } - dev_ctx_->Wait(); - if (local_completed) { - return; - } } } + dev_ctx_->Wait(); + batch_timer.Pause(); + VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); + ++batch_id_; } void SectionWorker::TrainFilesWithProfiler() { VLOG(3) << "begin section_worker TrainFiles with profiler"; - AutoSetCPUAffinity(true); + // AutoSetCPUAffinity(true); platform::Timer batch_timer; platform::Timer timeline; @@ -342,183 +228,42 @@ void SectionWorker::TrainFilesWithProfiler() { } #endif - if (thread_id_ == 0) { - struct timeval start; - struct timeval end; - struct timeval micro_start; - struct timeval micro_end; - while (true) { - // Start a minibatch. - batch_timer.Start(); - int real_microbatch_num = 0; - for (int i = 0; i < num_microbatches_; ++i) { - try { - int op_idx = 0; - gettimeofday(µ_start, NULL); - for (auto& op : ops_) { - gettimeofday(&start, NULL); - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - cudaDeviceSynchronize(); - timeline.Pause(); - gettimeofday(&end, NULL); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "::FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:SCOPE[" << i << "]:OP[" << op->Type() - << "]:START[" << start.tv_sec * 1e6 + start.tv_usec - << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" - << std::endl; - } - } - op_idx++; - } - gettimeofday(µ_end, NULL); - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:START[" - << micro_start.tv_sec * 1e6 + micro_start.tv_usec - << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec - << "]" << std::endl; - } - } catch (platform::EOFException&) { - std::unique_lock lk(thread_mutex); - threads_completed = true; - VLOG(3) << "thread " << thread_id_ << " completed."; - VLOG(3) << "called notify all"; - thread_condition.notify_all(); - VLOG(0) << "EOF encountered"; - VLOG(0) << "============timeline============"; - for (size_t i = 0; i < ops_.size(); ++i) { - VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i] - << ", min_time: " << op_min_time[i] - << ", mean_time: " << op_total_time[i] / op_count[i]; - } - VLOG(0) << "================================"; - break; - } - { - VLOG(3) << "called notify all"; - std::unique_lock lk(thread_mutex); - real_microbatch_num += 1; - batch_id_ += 1; - thread_condition.notify_all(); - } - } - dev_ctx_->Wait(); - // backward pass - for (int i = 0; i < real_microbatch_num; ++i) { - int op_idx = 0; - gettimeofday(µ_start, NULL); - for (auto& op : ops_) { - gettimeofday(&start, NULL); - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - cudaDeviceSynchronize(); - gettimeofday(&end, NULL); - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "::BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:SCOPE[" << i << "]:OP[" << op->Type() - << "]:START[" << start.tv_sec * 1e6 + start.tv_usec - << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" - << std::endl; - } - } - op_idx++; - } - gettimeofday(µ_end, NULL); - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:START[" - << micro_start.tv_sec * 1e6 + micro_start.tv_usec - << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec - << "]" << std::endl; - } - } - dev_ctx_->Wait(); - if (real_microbatch_num == 0) { - batch_timer.Pause(); - VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); - return; - } - // update pass + // if (thread_id_ == 0) { + struct timeval start; + struct timeval end; + struct timeval micro_start; + struct timeval micro_end; + // Start a minibatch. + batch_timer.Start(); + int real_microbatch_num = 0; + for (int i = 0; i < num_microbatches_; ++i) { + try { int op_idx = 0; gettimeofday(µ_start, NULL); for (auto& op : ops_) { gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { + // We run op with op_role = kLRSched only for the first microbatch + // to avoid increasing the @LR_DECAY_STEP@ multiple times. + bool run_first_mbatch = op_role == static_cast(OpRole::kForward) || + op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss)) || + op_role == static_cast(OpRole::kLRSched); + bool run_others = op_role == static_cast(OpRole::kForward) || + op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss)); + if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; + << " for scope " << i; timeline.Start(); - op->Run(*microbatch_scopes_[0], place_); + op->Run(*microbatch_scopes_[i], place_); if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } cudaDeviceSynchronize(); - gettimeofday(&end, NULL); timeline.Pause(); + gettimeofday(&end, NULL); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; if (time > op_max_time[op_idx]) { @@ -530,12 +275,11 @@ void SectionWorker::TrainFilesWithProfiler() { op_count[op_idx] += 1; op_total_time[op_idx] += time; { - std::unique_lock lk(cout_mutex); + // std::unique_lock lk(cout_mutex); std::cout << std::fixed; std::cout.precision(0); - std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:SCOPE[" << num_microbatches_ << "]:OP[" - << op->Type() << "]:START[" + std::cout << "::FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:SCOPE[" << i << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; } @@ -544,250 +288,142 @@ void SectionWorker::TrainFilesWithProfiler() { } gettimeofday(µ_end, NULL); { - std::unique_lock lk(cout_mutex); + // std::unique_lock lk(cout_mutex); std::cout << std::fixed; std::cout.precision(0); - std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ + std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; } - dev_ctx_->Wait(); - batch_timer.Pause(); - VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); - { - std::unique_lock lk(thread_mutex); - if (threads_completed) { - return; - } + } catch (platform::EOFException&) { + VLOG(3) << "thread completed."; + VLOG(0) << "EOF encountered"; + VLOG(0) << "============timeline============"; + for (size_t i = 0; i < ops_.size(); ++i) { + VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i] + << ", min_time: " << op_min_time[i] + << ", mean_time: " << op_total_time[i] / op_count[i]; } + VLOG(0) << "================================"; + break; } - } else { - struct timeval start; - struct timeval end; - struct timeval micro_start; - struct timeval micro_end; - cudaEvent_t cu_start, cu_stop; - cudaEventCreate(&cu_start); - cudaEventCreate(&cu_stop); - bool local_completed = false; - while (true) { - // forward pass: - int real_microbatch_num = 0; - for (int i = 0; i < num_microbatches_; ++i) { - { - PADDLE_ENFORCE_LE( - local_batch_id_, batch_id_, - platform::errors::InvalidArgument( - "local_batch_id_ (%d) must be less than or equal to " - "batch_id_ (%d)", - local_batch_id_, batch_id_)); - std::unique_lock lk(thread_mutex); - if (local_batch_id_ == batch_id_ && !threads_completed) { - thread_condition.wait(lk); - } - VLOG(3) << "thread " << thread_id_ << " local_batch_id_ " - << local_batch_id_ << " batch_id_ " << batch_id_; - if (threads_completed) { - local_completed = true; - VLOG(3) << "thread " << thread_id_ << " completed."; - lk.unlock(); - VLOG(0) << "============timeline============"; - for (size_t i = 0; i < ops_.size(); ++i) { - VLOG(0) << "op: " << op_name[i] - << ", max_time: " << op_max_time[i] - << ", min_time: " << op_min_time[i] - << ", mean_time: " << op_total_time[i] / op_count[i]; - } - VLOG(0) << "================================"; - break; - } - lk.unlock(); - real_microbatch_num += 1; - local_batch_id_ += 1; + } + dev_ctx_->Wait(); + // backward pass + for (int i = 0; i < num_microbatches_; ++i) { + int op_idx = 0; + gettimeofday(µ_start, NULL); + for (auto& op : ops_) { + gettimeofday(&start, NULL); + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kBackward) || + op_role == (static_cast(OpRole::kBackward) | + static_cast(OpRole::kLoss))) { + VLOG(3) << "running an op " << op->Type() << " for scope " << i; + timeline.Start(); + op->Run(*microbatch_scopes_[i], place_); + if (gc) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } - int op_idx = 0; - gettimeofday(µ_start, NULL); - for (auto& op : ops_) { - gettimeofday(&start, NULL); - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - cudaDeviceSynchronize(); - gettimeofday(&end, NULL); - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "::FWD:B[" << local_batch_id_ << "]:SEC[" - << thread_id_ << "]:SCOPE[" << i << "]:OP[" - << op->Type() << "]:START[" - << start.tv_sec * 1e6 + start.tv_usec << "]:END[" - << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; - } - } - op_idx++; + cudaDeviceSynchronize(); + gettimeofday(&end, NULL); + timeline.Pause(); + auto time = timeline.ElapsedUS(); + op_total_time[op_idx] += time; + if (time > op_max_time[op_idx]) { + op_max_time[op_idx] = time; + } + if (time < op_min_time[op_idx]) { + op_min_time[op_idx] = time; } - gettimeofday(µ_end, NULL); + op_count[op_idx] += 1; + op_total_time[op_idx] += time; { - std::unique_lock lk(cout_mutex); + // std::unique_lock lk(cout_mutex); std::cout << std::fixed; std::cout.precision(0); - std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:START[" - << micro_start.tv_sec * 1e6 + micro_start.tv_usec - << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec - << "]" << std::endl; + std::cout << "::BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:SCOPE[" << i << "]:OP[" << op->Type() << "]:START[" + << start.tv_sec * 1e6 + start.tv_usec << "]:END[" + << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; } } - dev_ctx_->Wait(); - // backward pass - for (int i = 0; i < real_microbatch_num; ++i) { - int op_idx = 0; - gettimeofday(µ_start, NULL); - for (auto& op : ops_) { - gettimeofday(&start, NULL); - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - cudaDeviceSynchronize(); - gettimeofday(&end, NULL); - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "::BWD:B[" << local_batch_id_ << "]:SEC[" - << thread_id_ << "]:SCOPE[" << i << "]:OP[" - << op->Type() << "]:START[" - << start.tv_sec * 1e6 + start.tv_usec << "]:END[" - << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; - } - } - op_idx++; - } - gettimeofday(µ_end, NULL); - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:START[" - << micro_start.tv_sec * 1e6 + micro_start.tv_usec - << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec - << "]" << std::endl; + op_idx++; + } + gettimeofday(µ_end, NULL); + { + // std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" + << std::endl; + } + } + dev_ctx_->Wait(); + if (real_microbatch_num == 0) { + batch_timer.Pause(); + VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); + return; + } + // update pass + int op_idx = 0; + gettimeofday(µ_start, NULL); + for (auto& op : ops_) { + gettimeofday(&start, NULL); + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kOptimize)) { + VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ + << " for minibatch scope"; + timeline.Start(); + op->Run(*microbatch_scopes_[0], place_); + if (gc) { + for (int i = 0; i < num_microbatches_; ++i) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } - dev_ctx_->Wait(); - if (real_microbatch_num == 0) { - return; + cudaDeviceSynchronize(); + gettimeofday(&end, NULL); + timeline.Pause(); + auto time = timeline.ElapsedUS(); + op_total_time[op_idx] += time; + if (time > op_max_time[op_idx]) { + op_max_time[op_idx] = time; } - // update pass - int op_idx = 0; - gettimeofday(µ_start, NULL); - for (auto& op : ops_) { - gettimeofday(&start, NULL); - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - timeline.Start(); - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } - cudaDeviceSynchronize(); - gettimeofday(&end, NULL); - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - { - std::unique_lock lk(cout_mutex); - std::cout << std::fixed; - std::cout.precision(0); - std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:SCOPE[" << num_microbatches_ << "]:OP[" - << op->Type() << "]:START[" - << start.tv_sec * 1e6 + start.tv_usec << "]:END[" - << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; - } - } - op_idx++; + if (time < op_min_time[op_idx]) { + op_min_time[op_idx] = time; } - gettimeofday(µ_end, NULL); + op_count[op_idx] += 1; + op_total_time[op_idx] += time; { - std::unique_lock lk(cout_mutex); std::cout << std::fixed; std::cout.precision(0); - std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ - << "]:START[" - << micro_start.tv_sec * 1e6 + micro_start.tv_usec << "]:END[" - << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" + std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:SCOPE[" << num_microbatches_ << "]:OP[" << op->Type() + << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; } - dev_ctx_->Wait(); - if (local_completed) { - return; - } } + op_idx++; + } + gettimeofday(µ_end, NULL); + { + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:START[" + << micro_start.tv_sec * 1e6 + micro_start.tv_usec << "]:END[" + << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; } + dev_ctx_->Wait(); + batch_timer.Pause(); + VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); + ++batch_id_; } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 1f97024d970..b66ab9e4131 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -15,6 +15,7 @@ limitations under the License. */ #pragma once #include +#include #include #include // NOLINT #include @@ -217,28 +218,35 @@ class PipelineTrainer : public TrainerBase { virtual Scope* GetWorkerScope(int thread_id); void InitDumpEnv() override; virtual std::string GetDumpPath(int tid); - void GetSkipVars(int section_id, const ProgramDesc& main_program); + void GetSkipVars(const ProgramDesc& main_program); protected: - int section_num_; + // int section_num_; int num_microbatches_; int start_cpu_core_id_; - std::vector places_; - std::vector> skip_vars_; + // std::vector places_; + platform::Place place_; + // std::vector> skip_vars_; + std::vector skip_vars_; TrainerDesc trainer_desc_; - std::vector section_threads_; + // std::vector section_threads_; + std::thread section_thread_; // worker: [section_id] - std::vector> workers_; + // std::vector> workers_; + std::shared_ptr worker_; // minibatch_scopes_: [section_id] - std::vector minibatch_scopes_; + // std::vector minibatch_scopes_; + Scope* minibatch_scope_; // microbatch_scopes_: [section_id][microbatch_id] - std::vector> microbatch_scopes_; - - void CopyParameters(int section_id, int microbatch_id, - const ProgramDesc& program, const platform::Place& place); - bool isPersistableVarGrad(std::string name); - bool isPersistable(VarDesc* var); + // std::vector> microbatch_scopes_; + // microbatch_scopes_: [microbatch_id] + std::vector microbatch_scopes_; + + void CopyParameters(int microbatch_id, const ProgramDesc& program, + const platform::Place& place); + // bool isPersistableVarGrad(std::string name); + // bool isPersistable(VarDesc* var); }; #endif diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 1985742fc4a..a83107c0d2e 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -84,7 +84,7 @@ message DownpourWorkerParameter { } message SectionWorkerParameter { - repeated SectionConfig section_config = 1; + SectionConfig section_config = 1; optional int32 queue_size = 2 [ default = 1 ]; optional int64 sync_steps = 3 [ default = 1 ]; optional int32 start_cpu_core_id = 4 [ default = 1 ]; diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 2dd654c35c3..a74071b5ca5 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -3784,6 +3784,7 @@ class PipelineOptimizer(object): Args: main_program (Program): the main program + devices: all used devices """ programs = [] # Map from device to its corresponding section program info @@ -3910,10 +3911,10 @@ class PipelineOptimizer(object): data_devices_map[var_name].append(dev_spec) return data_devices_map - def _insert_enq_deq_for_data_var(self, main_block, programs, startup, - devices): + def _insert_sendrecv_for_data_var(self, main_block, programs, startup, + devices): """ - Insert enqueue and dequeue ops for data var that on other devices. + Insert send and recv ops for data var that on other devices. Args: main_block (Block): Global block for main program @@ -3926,39 +3927,24 @@ class PipelineOptimizer(object): first_prog = programs[0]['program'] first_block = first_prog.block(0) - enqueue_index = 0 + insert_index = 0 for op in first_block.ops: - enqueue_index += 1 + insert_index += 1 if op.type == "read": break first_dev_spec = devices[0] for var_name in data_devices_map.keys(): for device in data_devices_map[var_name]: if device == first_dev_spec: continue - # step1: generate queue for each pair of data var and device - # that that data on - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup.block(0).create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup.block(0).append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) main_var = main_block.var(var_name) assert main_var.is_data if not var_name in first_block.vars: self._create_var(first_block, main_var, var_name) first_block._insert_op( - index=enqueue_index, - type='enqueue', + index=insert_index, + type='c_send', inputs={'X': first_block.var(var_name)}, attrs={ - 'queue_name': queue_name, self._op_device_key: first_dev_spec, self._op_role_key: self._op_role.Forward }) @@ -3972,12 +3958,11 @@ class PipelineOptimizer(object): new_var = self._create_var(block, source_var, var_name) block._insert_op( index=0, - type='dequeue', + type='c_recv', outputs={'Out': [new_var]}, attrs={ self._op_device_key: device, self._op_role_key: self._op_role.Forward, - 'queue_name': queue_name, }) def _strip_grad_suffix(self, name): @@ -4080,23 +4065,22 @@ class PipelineOptimizer(object): assert sorted_device_specs == device_specs return device_specs - def _insert_enq_deq_ops_for_boundaries(self, block, origin_block, - startup_program): + def _insert_sendrecv_ops_for_boundaries(self, block, origin_block): """ - Insert a pair of enqueue and dequeue ops for every two + Insert a pair of send and recv ops for every two consecutive ops on different devices. """ - startup_block = startup_program.global_block() extra_index = 0 # A map from var to device spec where op takes it as input, - # avoiding multiple enqueue and dequeue ops. + # avoiding multiple send and recv ops. var_devspec = dict() for index, op in list(enumerate(origin_block.ops)): - # skips lr-related op and vars, as we will process them later. + # skips lr-related ops and vars, as we will process them later. if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): continue + # skips update ops and vars, as we will process them later. if self._is_update_op(op): continue cur_device_spec = op.attr(self._op_device_key) @@ -4119,37 +4103,23 @@ class PipelineOptimizer(object): if cur_device_spec in var_devspec[var_name]: continue var_devspec[var_name].append(cur_device_spec) - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup_block.create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup_block.append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) op_role = op.all_attrs()[self._op_role_key] var = block.vars[var_name] block._insert_op( index=index + extra_index, - type='enqueue', + type='c_send', inputs={'X': var}, attrs={ - 'queue_name': queue_name, self._op_device_key: prev_device_spec, self._op_role_key: op_role }) extra_index += 1 block._insert_op( index=index + extra_index, - type='dequeue', + type='c_recv', outputs={'Out': [var]}, attrs={ self._op_device_key: cur_device_spec, - 'queue_name': queue_name, self._op_role_key: op_role }) extra_index += 1 @@ -4178,7 +4148,9 @@ class PipelineOptimizer(object): def _accumulate_gradients(self, block): """ - Accumulate the graident generated in microbatch to the one in mini-batch. + Accumulate the gradients generated in microbatch to the one in mini-batch. + We also scale the loss corresponding to number of micro-batches at + the same time. """ for index, op in reversed(list(enumerate(block.ops))): offset = index @@ -4210,12 +4182,10 @@ class PipelineOptimizer(object): for i in range(0, len(op_role_var), 2): grad_name = op_role_var[i + 1] grad_var = block.vars[grad_name] - param_name = op_role_var[i] - param_var = block.vars[param_name] - new_var_name = unique_name.generate(param_name) - new_var_name = self._append_grad_suffix(new_var_name) - new_var = self._create_var(block, grad_var, new_var_name) - self._rename_arg(op, grad_name, new_var_name) + new_grad_var_name = unique_name.generate(grad_name) + new_var = self._create_var(block, grad_var, + new_grad_var_name) + self._rename_arg(op, grad_name, new_grad_var_name) block._insert_op( index=offset + 1, type='sum', @@ -4247,7 +4217,6 @@ class PipelineOptimizer(object): def _get_device_info(self, block): for op in block.ops: - if not op._has_kernel(op.type): continue op_device = op.attr(self._op_device_key) return op_device @@ -4282,7 +4251,7 @@ class PipelineOptimizer(object): for prog in var_info[var_name]: block = prog.block(0) for op in block.ops: - if op.type == "dequeue": continue + if op.type == "c_recv": continue # We have processed lr related vars if op.attr(self._op_role_key) == int( self._op_role.Optimize.LRSched): @@ -4306,24 +4275,11 @@ class PipelineOptimizer(object): for prog in all_progs: if prog == write_prog: continue - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup_prog.block(0).create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup_prog.block(0).append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) write_block._insert_op( index=0, - type='enqueue', + type='c_send', inputs={'X': write_block.var(var_name), }, attrs={ - 'queue_name': queue_name, self._op_device_key: write_device, # A trick to make the role LRSched to avoid copy every # microbatch @@ -4333,14 +4289,13 @@ class PipelineOptimizer(object): read_device = self._get_device_info(read_block) read_block._insert_op( index=0, - type='dequeue', + type='c_recv', outputs={'Out': [read_block.var(var_name)]}, attrs={ self._op_device_key: read_device, # A trick to make the role LRSched to avoid copy every # microbatch self._op_role_key: self._op_role.LRSched, - 'queue_name': queue_name, }) def minimize(self, @@ -4365,14 +4320,13 @@ class PipelineOptimizer(object): device_specs = self._check_validation(main_block) - # Step3: add enqueue and dequeue ops between section boundaries + # Step3: add send and recv ops between section boundaries origin_prog = main_block.program.clone(for_test=False) origin_main_block = origin_prog.global_block() - self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block, - startup_program) + self._insert_sendrecv_ops_for_boundaries(main_block, origin_main_block) - # Step4: accumulate gradients during backward - # and clear them after update + # Step4: clear gradients before each mini-batch and + # accumulate gradients during backward self._clear_gradients(main_block) self._accumulate_gradients(main_block) @@ -4392,14 +4346,14 @@ class PipelineOptimizer(object): raise ValueError("Unknown device type: %s", dev_spec) # Step5: split program into sections and add pairs of - # enqueue and dequeue ops for data var. + # send and recv ops for data var. if len(place_list) <= 1: raise ValueError("Run on one device, do not use pipeline.") program_list = self._split_program(main_program, device_specs) for p in program_list: self._create_vars(p["program"].block(0), main_program) - self._insert_enq_deq_for_data_var(main_block, program_list, - startup_program, device_specs) + self._insert_sendrecv_for_data_var(main_block, program_list, + startup_program, device_specs) # Step6: Special Case: process persistable vars that exist in # multiple sections -- GitLab