From f77a78cdee33b84aaab3a73e278ae7e65a7dd929 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Mon, 23 Nov 2020 22:29:57 +0800 Subject: [PATCH] enable pipeline to run with Executor.run() (#28373) * update, test=develop --- paddle/fluid/framework/device_worker.h | 14 +- paddle/fluid/framework/pipeline_trainer.cc | 237 ++------ paddle/fluid/framework/section_worker.cc | 558 ++---------------- paddle/fluid/framework/trainer.h | 27 +- paddle/fluid/framework/trainer_desc.proto | 2 +- .../meta_optimizers/pipeline_optimizer.py | 134 +++-- python/paddle/fluid/device_worker.py | 30 +- python/paddle/fluid/executor.py | 54 +- python/paddle/fluid/optimizer.py | 484 +++++++-------- .../fluid/tests/unittests/CMakeLists.txt | 4 +- .../fluid/tests/unittests/pipeline_mnist.py | 136 +++++ .../fluid/tests/unittests/test_dist_base.py | 120 +++- .../test_fleet_pipeline_meta_optimizer.py | 11 +- .../fluid/tests/unittests/test_pipeline.py | 226 +------ 14 files changed, 780 insertions(+), 1257 deletions(-) mode change 100755 => 100644 python/paddle/fluid/optimizer.py create mode 100644 python/paddle/fluid/tests/unittests/pipeline_mnist.py diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index a254248fea..e81e0c66f9 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -540,7 +540,7 @@ class HeterBoxWorker : public HogwildWorker { #if defined(PADDLE_WITH_NCCL) class SectionWorker : public DeviceWorker { public: - SectionWorker() { local_batch_id_ = 0; } + SectionWorker() {} ~SectionWorker() override {} void Initialize(const TrainerDesc& desc) override; @@ -549,13 +549,12 @@ class SectionWorker : public DeviceWorker { void CreateDeviceResource(const ProgramDesc& main_prog) override{}; void TrainFiles() override; - void TrainFilesWithProfiler() override; + void TrainFilesWithProfiler() override{}; void PrintFetchVars() override {} const platform::Place& place() const { return place_; } - void SetSectionIndex(int section_id) { section_id_ = section_id; } void SetDeviceIndex(int tid) override {} void SetThreadIndex(int thread_id) { thread_id_ = thread_id; } void SetMicrobatchNum(int num) { num_microbatches_ = num; } @@ -566,13 +565,8 @@ class SectionWorker : public DeviceWorker { void SetSkipVars(const std::vector& skip_vars) { skip_vars_ = skip_vars; } - static void ResetBatchId() { batch_id_ = 0; } - static void ResetThreadCompletedFlag() { threads_completed = false; } - - static std::atomic cpu_id_; protected: - void AutoSetCPUAffinity(bool reuse); int section_id_; int thread_id_; int num_microbatches_; @@ -581,12 +575,8 @@ class SectionWorker : public DeviceWorker { const Scope* minibatch_scope_; std::vector> ops_; - static std::mutex thread_mutex; - static std::condition_variable thread_condition; - static bool threads_completed; std::shared_ptr program_; static uint64_t batch_id_; - uint64_t local_batch_id_; platform::DeviceContext* dev_ctx_ = nullptr; }; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index d7506edbf4..58e0920329 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -13,6 +13,7 @@ // limitations under the License. #if defined(PADDLE_WITH_NCCL) +#include #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/trainer.h" @@ -26,83 +27,25 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc, const auto& section_params = trainer_desc.section_param(); num_microbatches_ = section_params.num_microbatches(); VLOG(3) << "Number of microbatches per minibatch: " << num_microbatches_; - section_num_ = section_params.section_config_size(); - VLOG(3) << "Number of program sections: " << section_num_; trainer_desc_ = trainer_desc; - start_cpu_core_id_ = section_params.start_cpu_core_id(); - SetDataset(dataset); ParseDumpConfig(trainer_desc); - // get filelist from trainer_desc here - const std::vector readers = - dataset->GetReaders(); - VLOG(3) << "readers num: " << readers.size(); - int num_readers = readers.size(); - PADDLE_ENFORCE_EQ(num_readers, 1, - platform::errors::InvalidArgument( - "Number of dataset readers for pipeline " - "must be 1 now, but the value you give is %d.", - num_readers)); - auto* reader = readers[0]; - feed_var_names_ = reader->GetUseSlotAlias(); - - workers_.resize(section_num_); - for (int i = 0; i < section_num_; ++i) { - const auto& section_config = section_params.section_config(i); - platform::Place place; - int place_id = section_config.place_id(); - switch (section_config.place()) { - case SectionConfig::CPUPlace: - place = platform::CPUPlace(); - break; - case SectionConfig::CUDAPlace: - // Note that one section has at most one GPU place in one pipeline - PADDLE_ENFORCE_GE( - place_id, 0, - platform::errors::InvalidArgument( - "The place_id value for CUDAPlace shoud be greater " - "than or equal to 0, but the value you give is %d.", - place_id)); - place = platform::CUDAPlace(place_id); - break; - case SectionConfig::CUDAPinnedPlace: - place = platform::CUDAPinnedPlace(); - break; - default: - PADDLE_ENFORCE_NOT_NULL(nullptr, - platform::errors::InvalidArgument( - "Unkown place type in SectionConfig: %d", - section_config.place())); - } - places_.emplace_back(place); - VLOG(3) << "Device worker place: " << place << ", device id: " << place_id - << ", section: " << i; - - workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( - trainer_desc.device_worker_name()); - auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); - if (i == 0) { - // we only set reader for the first section - this_worker->SetDataFeed(reader); - this_worker->SetReaderPlace(place); - } - this_worker->SetThreadIndex(i); - this_worker->SetSectionIndex(i); - this_worker->SetPlace(place); - this_worker->Initialize(trainer_desc); - this_worker->SetMicrobatchNum(num_microbatches_); - } - // set debug here - SetDebug(trainer_desc.debug()); + const auto& section_config = section_params.section_config(); + int place_id = section_config.place_id(); + place_ = platform::CUDAPlace(place_id); + worker_ = DeviceWorkerFactory::CreateDeviceWorker( + trainer_desc.device_worker_name()); + auto this_worker = + std::dynamic_pointer_cast(worker_); + this_worker->SetPlace(place_); + this_worker->Initialize(trainer_desc); + this_worker->SetMicrobatchNum(num_microbatches_); } void PipelineTrainer::InitOtherEnv(const ProgramDesc& main_program) { if (need_dump_field_) { InitDumpEnv(); } - VLOG(3) << "init other env done."; } std::string PipelineTrainer::GetDumpPath(int tid) { @@ -119,143 +62,87 @@ void PipelineTrainer::InitDumpEnv() { } } -void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, +void PipelineTrainer::CopyParameters(int microbatch_id, const ProgramDesc& program, const platform::Place& place) { auto& global_block = program.Block(0); + std::map param_map; for (auto& var : global_block.AllVars()) { - int is_feed_var = - std::count(feed_var_names_.begin(), feed_var_names_.end(), var->Name()); - if ((var->Persistable() || is_feed_var) && microbatch_id == 0) { - if (is_feed_var) { - auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); - VLOG(3) << "data name: " << var->Name() << ", ptr: " << new_ptr; - InitializeVariable(new_ptr, var->GetType()); - } else { - auto* ptr = root_scope_->FindVar(var->Name()); - auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); - VLOG(3) << "Create persistable var " << var->Name() << " for minibatch " - << section_id << ", which pointer is " << new_ptr; - InitializeVariable(new_ptr, var->GetType()); - const LoDTensor& root_tensor = ptr->Get(); - LoDTensor* minibatch_tensor = new_ptr->GetMutable(); - TensorCopy(*static_cast(&root_tensor), place, - static_cast(minibatch_tensor)); - } - } else if (!var->Persistable() && !is_feed_var) { - auto* ptr = - microbatch_scopes_[section_id][microbatch_id]->Var(var->Name()); - VLOG(3) << "Create variable " << var->Name() << " for section " - << section_id << " microbatch " << microbatch_id - << ", which pointer is " << ptr; - InitializeVariable(ptr, var->GetType()); + if (var->Persistable()) { + param_map[var->Name()] = 1; } } -} -void PipelineTrainer::GetSkipVars(int section_id, const ProgramDesc& program) { - auto& global_block = program.Block(0); - for (auto& op : global_block.AllOps()) { - if (op->Type() != "enqueue") { - continue; + for (auto& var : global_block.AllVars()) { + bool is_param_grad = false; + size_t pos = 0; + if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) { + auto prefix_name = var->Name().substr(0, pos); + if (param_map.find(prefix_name) != param_map.end()) { + is_param_grad = true; + } } - auto input_arg_names = op->InputArgumentNames(); - PADDLE_ENFORCE_EQ(input_arg_names.size(), 1, - platform::errors::InvalidArgument( - "Number of input arguments for enqueue op must be 1, " - "but the value is %d.", - input_arg_names.size())); - std::string input_arg_name = input_arg_names[0]; - if (input_arg_name.rfind("@GRAD") != input_arg_name.size() - 5) { - skip_vars_[section_id].emplace_back(input_arg_name); - VLOG(3) << "add skip var name: " << input_arg_name; + if (var->Persistable() && microbatch_id == 0) { + auto* ptr = root_scope_->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + VLOG(3) << "Create persistable var: " << var->Name() + << ", which pointer is " << ptr; + } else if (is_param_grad && microbatch_id == 0) { + auto* ptr = minibatch_scope_->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + VLOG(3) << "Create grad for persistable var: " << var->Name() + << ", which pointer is " << ptr; + } else if (!var->Persistable() && !is_param_grad) { + auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name()); + VLOG(3) << "Create variable " << var->Name() << " for microbatch " + << microbatch_id << ", which pointer is " << ptr; + InitializeVariable(ptr, var->GetType()); } } } void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, const platform::Place& place) { - PADDLE_ENFORCE_NOT_NULL(root_scope_, - platform::errors::InvalidArgument( - "root_scope pointer can not be nullptr")); - auto start_cpu_id = trainer_desc_.section_param().start_cpu_core_id(); - SectionWorker::cpu_id_.store(start_cpu_id); - minibatch_scopes_.resize(section_num_); - microbatch_scopes_.resize(section_num_); - skip_vars_.resize(section_num_); - - VLOG(3) << "Init ScopeQueues and create all scopes"; - for (int i = 0; i < section_num_; ++i) { - minibatch_scopes_[i] = &root_scope_->NewScope(); - std::shared_ptr program; - program.reset(new ProgramDesc( - trainer_desc_.section_param().section_config(i).program_desc())); - microbatch_scopes_[i].resize(num_microbatches_); - for (int j = 0; j < num_microbatches_; ++j) { - microbatch_scopes_[i][j] = &minibatch_scopes_[i]->NewScope(); - CopyParameters(i, j, *program, places_[i]); - } - GetSkipVars(i, *program); + PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( + "root_scope_ can not be nullptr")); + microbatch_scopes_.resize(num_microbatches_); + + VLOG(3) << "Create minibatch and microbatch scopes..."; + minibatch_scope_ = &root_scope_->NewScope(); + std::shared_ptr program; + program.reset(new ProgramDesc( + trainer_desc_.section_param().section_config().program_desc())); + for (int j = 0; j < num_microbatches_; ++j) { + microbatch_scopes_[j] = &minibatch_scope_->NewScope(); + CopyParameters(j, *program, place_); } - for (int i = 0; i < section_num_; ++i) { - auto this_worker = - std::dynamic_pointer_cast( - workers_[i]); - this_worker->SetRootScope(root_scope_); - this_worker->SetMinibatchScope(minibatch_scopes_[i]); - this_worker->SetMicrobatchScopes(microbatch_scopes_[i]); - this_worker->SetSkipVars(skip_vars_[i]); - } + auto this_worker = + std::dynamic_pointer_cast(worker_); + this_worker->SetRootScope(root_scope_); + this_worker->SetMinibatchScope(minibatch_scope_); + this_worker->SetMicrobatchScopes(microbatch_scopes_); } void PipelineTrainer::Run() { - VLOG(3) << "Going to run"; - for (int i = 0; i < section_num_; ++i) { - if (!debug_) { - section_threads_.push_back( - std::thread(&DeviceWorker::TrainFiles, workers_[i].get())); - } else { - section_threads_.push_back(std::thread( - &DeviceWorker::TrainFilesWithProfiler, workers_[i].get())); - } - } + VLOG(5) << "Going to run PipelineTrainer::Run()"; + section_thread_ = std::async(&DeviceWorker::TrainFiles, worker_.get()); } void PipelineTrainer::Finalize() { - for (auto& th : section_threads_) { - th.join(); + try { + section_thread_.get(); + } catch (platform::EOFException& e) { + std::rethrow_exception(std::current_exception()); } if (need_dump_field_) { FinalizeDumpEnv(); } - VLOG(3) << "copying back parameters. "; - for (int i = 0; i < section_num_; ++i) { - std::shared_ptr program; - program.reset(new ProgramDesc( - trainer_desc_.section_param().section_config(i).program_desc())); - for (int j = 0; j < num_microbatches_; ++j) { - auto& global_block = program->Block(0); - for (auto& var : global_block.AllVars()) { - if (var->Persistable()) { - auto* ptr = root_scope_->FindVar(var->Name()); - LoDTensor* root_tensor = ptr->GetMutable(); - auto* minibatch_ptr = minibatch_scopes_[i]->Var(var->Name()); - const LoDTensor& minibatch_tensor = minibatch_ptr->Get(); - TensorCopy(*static_cast(&minibatch_tensor), places_[0], - static_cast(root_tensor)); - VLOG(4) << "Copy persitable var " << var->Name() << " to root scope"; - } - } - } - } root_scope_->DropKids(); - SectionWorker::ResetBatchId(); - SectionWorker::ResetThreadCompletedFlag(); } Scope* PipelineTrainer::GetWorkerScope(int thread_id) { - return microbatch_scopes_[thread_id][0]; + return microbatch_scopes_[0]; } } // end namespace framework diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index b9a3cac0ec..6634cb98d6 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -30,540 +30,94 @@ limitations under the License. */ namespace paddle { namespace framework { -std::atomic SectionWorker::cpu_id_(0); -std::mutex SectionWorker::thread_mutex; -std::condition_variable SectionWorker::thread_condition; -bool SectionWorker::threads_completed = false; uint64_t SectionWorker::batch_id_(0); void SectionWorker::Initialize(const TrainerDesc& desc) { dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); - program_.reset(new ProgramDesc( - desc.section_param().section_config(section_id_).program_desc())); + program_.reset( + new ProgramDesc(desc.section_param().section_config().program_desc())); for (auto& op_desc : program_->Block(0).AllOps()) { ops_.push_back(OpRegistry::CreateOp(*op_desc)); } } -void SectionWorker::AutoSetCPUAffinity(bool reuse) { - int thread_cpu_id = cpu_id_.fetch_add(1); - - unsigned concurrency_cap = std::thread::hardware_concurrency(); - unsigned proc = thread_cpu_id; - - if (proc >= concurrency_cap) { - if (reuse) { - proc %= concurrency_cap; - } else { - LOG(INFO) << "All " << concurrency_cap - << " CPUs have been set affinities. Fail to set " - << thread_cpu_id << "th thread"; - return; - } - } - - cpu_set_t mask; - CPU_ZERO(&mask); - CPU_SET(proc, &mask); - - if (-1 == sched_setaffinity(0, sizeof(mask), &mask)) { - LOG(WARNING) << "Fail to set thread affinity to CPU " << proc; - return; - } - - CPU_ZERO(&mask); - if ((0 != sched_getaffinity(0, sizeof(mask), &mask)) || - (0 == CPU_ISSET(proc, &mask))) { - LOG(WARNING) << "Fail to set thread affinity to CPU " << proc; - } - VLOG(3) << "Set " << thread_cpu_id << "th thread affinity to CPU " << proc; -} - void SectionWorker::TrainFiles() { - VLOG(3) << "begin section_worker TrainFiles"; - AutoSetCPUAffinity(true); + VLOG(5) << "begin section_worker TrainFiles"; - int64_t max_memory_size = 0; + int64_t max_memory_size = GetEagerDeletionThreshold(); std::unique_ptr gc; auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); + if (max_memory_size >= 0) { #ifdef PADDLE_WITH_CUDA - if (platform::is_gpu_place(place_)) { - if (IsFastEagerDeletionModeEnabled()) { - gc.reset(new UnsafeFastGPUGarbageCollector( - BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size)); - } else { - gc.reset(new DefaultStreamGarbageCollector( - BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size)); + if (platform::is_gpu_place(place_)) { + if (IsFastEagerDeletionModeEnabled()) { + gc.reset(new UnsafeFastGPUGarbageCollector( + BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size)); + } } - } else if (platform::is_cpu_place(place_)) { #endif - gc.reset(new CPUGarbageCollector( - BOOST_GET_CONST(platform::CPUPlace, place_), max_memory_size)); -#ifdef PADDLE_WITH_CUDA } -#endif - if (thread_id_ == 0) { - while (true) { - // Start a minibatch. - for (int i = 0; i < num_microbatches_; ++i) { - try { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || + for (int i = 0; i < num_microbatches_; ++i) { + for (auto& op : ops_) { + int op_role = op->Attr(std::string("op_role")); + // We run op with op_role = kLRSched only for the first microbatch + // to avoid increasing the @LR_DECAY_STEP@ multiple times. + bool run_first_mbatch = op_role == static_cast(OpRole::kForward) || op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } catch (platform::EOFException&) { - std::unique_lock lk(thread_mutex); - threads_completed = true; - VLOG(3) << "thread " << thread_id_ << " completed."; - VLOG(3) << "called notify all"; - thread_condition.notify_all(); - VLOG(0) << "EOF encountered"; - return; - } - if (i == 0) { - VLOG(3) << "called notify all"; - std::unique_lock lk(thread_mutex); - batch_id_ += 1; - thread_condition.notify_all(); + static_cast(OpRole::kLoss)) || + op_role == static_cast(OpRole::kLRSched); + bool run_others = op_role == static_cast(OpRole::kForward) || + op_role == (static_cast(OpRole::kForward) | + static_cast(OpRole::kLoss)); + if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { + VLOG(3) << "Forward: running op " << op->Type() << " for micro-batch " + << i; + op->Run(*microbatch_scopes_[i], place_); + if (gc) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } - // backward pass - for (int i = 0; i < num_microbatches_; ++i) { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } - // update pass - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } - } - } - dev_ctx_->Wait(); } - } else { - while (true) { - { - PADDLE_ENFORCE_LE( - local_batch_id_, batch_id_, - platform::errors::InvalidArgument( - "local_batch_id_ (%d) must be less than or equal to " - "batch_id_ (%d)", - local_batch_id_, batch_id_)); - std::unique_lock lk(thread_mutex); - if (local_batch_id_ == batch_id_ && !threads_completed) { - thread_condition.wait(lk); - } - VLOG(3) << "thread " << thread_id_ << " local_batch_id_ " - << local_batch_id_ << " batch_id_ " << batch_id_; - if (threads_completed) { - VLOG(3) << "thread " << thread_id_ << " completed."; - lk.unlock(); - return; - } - lk.unlock(); - local_batch_id_ += 1; - } - // forward pass: - for (int i = 0; i < num_microbatches_; ++i) { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } - // backward pass - for (int i = 0; i < num_microbatches_; ++i) { - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - } - } - } - // update pass - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } + cudaDeviceSynchronize(); + } + + // backward pass + for (int i = 0; i < num_microbatches_; ++i) { + for (auto& op : ops_) { + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kBackward) || + op_role == (static_cast(OpRole::kBackward) | + static_cast(OpRole::kLoss))) { + VLOG(3) << "Backward: running op " << op->Type() << " for micro-batch " + << i; + op->Run(*microbatch_scopes_[i], place_); + if (gc) { + DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, + gc.get()); } } - dev_ctx_->Wait(); } + cudaDeviceSynchronize(); } -} - -void SectionWorker::TrainFilesWithProfiler() { - VLOG(3) << "begin section_worker TrainFiles with profiler"; - AutoSetCPUAffinity(true); - - platform::Timer batch_timer; - platform::Timer timeline; - std::vector op_total_time; - std::vector op_name; - std::vector op_max_time; - std::vector op_min_time; - std::vector op_count; + // update pass for (auto& op : ops_) { - op_name.push_back(op->Type()); - } - op_total_time.resize(ops_.size()); - op_max_time.resize(ops_.size()); - op_min_time.resize(ops_.size()); - for (size_t i = 0; i < op_min_time.size(); ++i) { - op_min_time[i] = DBL_MAX; - } - op_count.resize(ops_.size()); - - int64_t max_memory_size = 0; - std::unique_ptr gc; - // const std::vector keep_vars; - auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); -#ifdef PADDLE_WITH_CUDA - if (platform::is_gpu_place(place_)) { - if (IsFastEagerDeletionModeEnabled()) { - gc.reset(new UnsafeFastGPUGarbageCollector( - BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size)); - } else { - gc.reset(new DefaultStreamGarbageCollector( - BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size)); - } - } else if (platform::is_cpu_place(place_)) { -#endif - gc.reset(new CPUGarbageCollector( - BOOST_GET_CONST(platform::CPUPlace, place_), max_memory_size)); -#ifdef PADDLE_WITH_CUDA - } -#endif - - if (thread_id_ == 0) { - while (true) { - // Start a minibatch. - // int batch_size = 0; - batch_timer.Start(); - for (int i = 0; i < num_microbatches_; ++i) { - try { - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - } catch (platform::EOFException&) { - std::unique_lock lk(thread_mutex); - threads_completed = true; - VLOG(3) << "thread " << thread_id_ << " completed."; - VLOG(3) << "called notify all"; - thread_condition.notify_all(); - VLOG(0) << "EOF encountered"; - VLOG(0) << "============timeline============"; - for (size_t i = 0; i < ops_.size(); ++i) { - VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i] - << ", min_time: " << op_min_time[i] - << ", mean_time: " << op_total_time[i] / op_count[i]; - } - VLOG(0) << "================================"; - return; - } - if (i == 0) { - VLOG(3) << "called notify all"; - std::unique_lock lk(thread_mutex); - batch_id_ += 1; - thread_condition.notify_all(); - } + int op_role = op->Attr(std::string("op_role")); + if (op_role == static_cast(OpRole::kOptimize)) { + VLOG(3) << "Update: running op " << op->Type(); + op->Run(*microbatch_scopes_[0], place_); + if (gc) { + DeleteUnusedTensors(*microbatch_scopes_[0], op.get(), unused_vars_, + gc.get()); } - // backward pass - for (int i = 0; i < num_microbatches_; ++i) { - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - } - // update pass - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - timeline.Start(); - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - dev_ctx_->Wait(); - batch_timer.Pause(); - VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); - } - } else { - while (true) { - { - PADDLE_ENFORCE_LE( - local_batch_id_, batch_id_, - platform::errors::InvalidArgument( - "local_batch_id_ (%d) must be less than or equal to " - "batch_id_ (%d)", - local_batch_id_, batch_id_)); - std::unique_lock lk(thread_mutex); - if (local_batch_id_ == batch_id_ && !threads_completed) { - thread_condition.wait(lk); - } - VLOG(3) << "thread " << thread_id_ << " local_batch_id_ " - << local_batch_id_ << " batch_id_ " << batch_id_; - if (threads_completed) { - VLOG(3) << "thread " << thread_id_ << " completed."; - lk.unlock(); - VLOG(0) << "============timeline============"; - for (size_t i = 0; i < ops_.size(); ++i) { - VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i] - << ", min_time: " << op_min_time[i] - << ", mean_time: " << op_total_time[i] / op_count[i]; - } - VLOG(0) << "================================"; - return; - } - lk.unlock(); - local_batch_id_ += 1; - } - // forward pass: - for (int i = 0; i < num_microbatches_; ++i) { - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - // We run op with op_role = kLRSched only for the first microbatch - // to avoid increasing the @LR_DECAY_STEP@ multiple times. - bool run_first_mbatch = - op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)) || - op_role == static_cast(OpRole::kLRSched); - bool run_others = op_role == static_cast(OpRole::kForward) || - op_role == (static_cast(OpRole::kForward) | - static_cast(OpRole::kLoss)); - if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - } - // backward pass - for (int i = 0; i < num_microbatches_; ++i) { - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kBackward) || - op_role == (static_cast(OpRole::kBackward) | - static_cast(OpRole::kLoss))) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for scope " << i; - timeline.Start(); - op->Run(*microbatch_scopes_[i], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), - unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - } - // update pass - int op_idx = 0; - for (auto& op : ops_) { - int op_role = op->Attr(std::string("op_role")); - if (op_role == static_cast(OpRole::kOptimize)) { - VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ - << " for minibatch scope"; - timeline.Start(); - op->Run(*microbatch_scopes_[0], place_); - if (gc) { - DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], - op.get(), unused_vars_, gc.get()); - } - timeline.Pause(); - auto time = timeline.ElapsedUS(); - op_total_time[op_idx] += time; - if (time > op_max_time[op_idx]) { - op_max_time[op_idx] = time; - } - if (time < op_min_time[op_idx]) { - op_min_time[op_idx] = time; - } - op_count[op_idx] += 1; - op_total_time[op_idx] += time; - } - op_idx++; - } - dev_ctx_->Wait(); } } + dev_ctx_->Wait(); + ++batch_id_; } + } // namespace framework } // namespace paddle #endif diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 88dbe9c748..f4c8246938 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -290,29 +290,22 @@ class PipelineTrainer : public TrainerBase { virtual Scope* GetWorkerScope(int thread_id); void InitDumpEnv() override; virtual std::string GetDumpPath(int tid); - void GetSkipVars(int section_id, const ProgramDesc& main_program); + void GetSkipVars(const ProgramDesc& main_program); protected: - int section_num_; int num_microbatches_; - int start_cpu_core_id_; - std::vector feed_var_names_; - std::vector places_; - std::vector> skip_vars_; + platform::Place place_; + std::vector skip_vars_; TrainerDesc trainer_desc_; - std::vector section_threads_; - // worker: [section_id] - std::vector> workers_; - // minibatch_scopes_: [section_id] - std::vector minibatch_scopes_; - // microbatch_scopes_: [section_id][microbatch_id] - std::vector> microbatch_scopes_; + std::future section_thread_; + std::shared_ptr worker_; + Scope* minibatch_scope_; + // microbatch_scopes_: [microbatch_id] + std::vector microbatch_scopes_; - void CopyParameters(int section_id, int microbatch_id, - const ProgramDesc& program, const platform::Place& place); - bool isPersistableVarGrad(std::string name); - bool isPersistable(VarDesc* var); + void CopyParameters(int microbatch_id, const ProgramDesc& program, + const platform::Place& place); }; #endif diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 4d2e6d9b3a..c4e9064d05 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -86,7 +86,7 @@ message DownpourWorkerParameter { } message SectionWorkerParameter { - repeated SectionConfig section_config = 1; + optional SectionConfig section_config = 1; optional int32 queue_size = 2 [ default = 1 ]; optional int64 sync_steps = 3 [ default = 1 ]; optional int32 start_cpu_core_id = 4 [ default = 1 ]; diff --git a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py index 889fec838e..f3bdb305f4 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and from __future__ import print_function +from __future__ import division import paddle.fluid as fluid from paddle.fluid import core, unique_name @@ -21,9 +22,55 @@ from .meta_optimizer_base import MetaOptimizerBase from .common import OpRole, OP_ROLE_KEY, OP_ROLE_VAR_KEY, CollectiveHelper, is_update_op, is_loss_grad_op, is_backward_op, is_optimizer_op -class PipelineHelper(CollectiveHelper): - def __init__(self, role_maker, nrings=1, wait_port='6174'): - super(PipelineHelper, self).__init__(role_maker, nrings, wait_port) +def _get_node_num(endpoints): + ss = set() + for ep in endpoints: + ip = ep.split(":")[0].strip() + if ip not in ss: + ss.add(ip) + return len(ss) + + +class PipelineHelper(object): + def __init__(self, role_maker, wait_port='6174'): + self.wait_port = wait_port + self.role_maker = role_maker + + def update_startup_program(self, + startup_program=None, + inner_parallelism=None): + self.startup_program = startup_program + + endpoints = self.role_maker._get_trainer_endpoints() + current_endpoint = endpoints[self.role_maker._worker_index()] + node_num = _get_node_num(endpoints) + assert len(endpoints) % node_num == 0 + nranks = self.role_maker._worker_num() + rank = self.role_maker._worker_index() + + # Create ring 0 for all gpus in a pipeline + pipeline_endpoints = [] + pipeline_rank = rank % inner_parallelism + pipeline_id = rank // inner_parallelism + for idx, ep in enumerate(endpoints): + if idx // inner_parallelism == pipeline_id: + pipeline_endpoints.append(ep) + self._init_communicator(self.startup_program, current_endpoint, + pipeline_endpoints, pipeline_rank, 0, + self.wait_port) + + pipeline_num = len(endpoints) // inner_parallelism + if pipeline_num == 1: return + # Create rings for gpus with the same gpu id + eps = [] + local_rank = self.role_maker._worker_index() % inner_parallelism + ring_id = local_rank + 1 + for i in range(pipeline_num): + eps.append(endpoints[i * inner_parallelism + local_rank]) + temp_rank = self.role_maker._worker_index() // inner_parallelism + self._init_communicator(self.startup_program, current_endpoint, eps, + temp_rank, ring_id, self.wait_port) + self._broadcast_params(ring_id) def _init_communicator(self, program, current_endpoint, endpoints, rank, ring_id, wait_port): @@ -46,9 +93,8 @@ class PipelineHelper(CollectiveHelper): 'rank': rank, 'endpoint': current_endpoint, 'other_endpoints': other_endpoints, - OP_ROLE_KEY: OpRole.Forward + OP_ROLE_KEY: OpRole.Forward, }) - block.append_op( type='c_comm_init', inputs={'X': nccl_id_var}, @@ -58,12 +104,10 @@ class PipelineHelper(CollectiveHelper): 'rank': rank, 'ring_id': ring_id, OP_ROLE_KEY: OpRole.Forward, - 'device_id': OpRole.Forward }) - def _broadcast_params(self): + def _broadcast_params(self, ring_id): block = self.startup_program.global_block() - ring_id = 0 for param in block.iter_parameters(): if param.is_distributed: continue @@ -78,13 +122,12 @@ class PipelineHelper(CollectiveHelper): OP_ROLE_KEY: OpRole.Forward }) - for ring_id in range(self.nrings): - block.append_op( - type='c_sync_comm_stream', - inputs={'X': param}, - outputs={'Out': param}, - attrs={'ring_id': ring_id, - OP_ROLE_KEY: OpRole.Forward}) + block.append_op( + type='c_sync_comm_stream', + inputs={'X': param}, + outputs={'Out': param}, + attrs={'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Forward}) class PipelineOptimizer(MetaOptimizerBase): @@ -99,8 +142,8 @@ class PipelineOptimizer(MetaOptimizerBase): user_defined_strategy): super(PipelineOptimizer, self)._set_basic_info( loss, role_maker, user_defined_optimizer, user_defined_strategy) - num_microbatches = user_defined_strategy.pipeline_configs['micro_batch'] - self.wrapped_opt = PO(self.inner_opt, num_microbatches=num_microbatches) + self.num_microbatches = user_defined_strategy.pipeline_configs[ + 'micro_batch'] def _can_apply(self): if not self.role_maker._is_collective: @@ -115,29 +158,46 @@ class PipelineOptimizer(MetaOptimizerBase): dist_strategy.pipeline_configs = {} def _enable_strategy(self, dist_strategy, context): - # we do not support enable pipeline automatically right now - return + dist_strategy.pipeline = True + dist_strategy.pipeline_configs = {"micro_batch": 1, } + + def _get_local_rank(self, current_endpoint, endpoints): + cur_node_endpoints = [] + cur_ip = current_endpoint.split(':')[0].strip() + for ep in endpoints: + if cur_ip == ep.split(':')[0].strip(): + cur_node_endpoints.append(ep) + return cur_node_endpoints.index(current_endpoint) def minimize_impl(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): - optimize_ops, params_grads, prog_list = \ - self.wrapped_opt.minimize(loss, startup_program, - parameter_list, no_grad_set) - if self.role_maker._worker_num() == 1: - return optimize_ops, params_grads - endpoints = self.role_maker._get_trainer_endpoints() current_endpoint = endpoints[self.role_maker._worker_index()] + self.local_rank = self._get_local_rank(current_endpoint, endpoints) + self.wrapped_opt = PO(self.inner_opt, + num_microbatches=self.num_microbatches, + start_cpu_core_id=self.local_rank) + node_num = _get_node_num(endpoints) + gpus_per_node = len(endpoints) // node_num self.startup_program = startup_program + self.local_rank = self._get_local_rank(current_endpoint, endpoints) if startup_program is None: self.startup_program = fluid.default_startup_program() + loss.block.program._pipeline_opt = dict() + loss.block.program._pipeline_opt['local_rank'] = self.local_rank + optimize_ops, params_grads, prog_list = \ + self.wrapped_opt.minimize(loss, startup_program, + parameter_list, no_grad_set) + assert prog_list self.main_program_list = prog_list self.main_program = loss.block.program + self.inner_parallelism = loss.block.program._pipeline_opt[ + 'inner_parallelism'] nranks = len(endpoints) self.nranks = nranks self.nrings = len(self.main_program_list) @@ -146,24 +206,26 @@ class PipelineOptimizer(MetaOptimizerBase): self.endpoints = endpoints self.current_endpoint = current_endpoint - pipeline_helper = PipelineHelper(self.role_maker, nrings=self.nrings) - pipeline_helper.update_startup_program(self.startup_program) + pipeline_helper = PipelineHelper(self.role_maker) + pipeline_helper.update_startup_program( + self.startup_program._pipeline_opt["startup_program"], + self.inner_parallelism) - self._transpile_main_program() + self._transpile_main_program(loss, node_num, gpus_per_node) return optimize_ops, params_grads - def _transpile_main_program(self): - self._insert_loss_grad_ops() - for ring_id in range(self.nrings): + def _transpile_main_program(self, loss, node_num, gpus_per_node): + self._insert_loss_grad_ops(loss, gpus_per_node, node_num) + for ring_id in range(1, gpus_per_node + 1): self._insert_allreduce_ops(ring_id) - def _insert_loss_grad_ops(self): + def _insert_loss_grad_ops(self, loss, gpus_per_node, node_num): """ In order to keep the learning rate consistent in different numbers of training workers, we scale the loss grad by the number of workers """ - block = self.main_program_list[self.nrings - 1]['program'].global_block( - ) + block = self.main_program_list[gpus_per_node - 1][ + 'program'].global_block() for idx, op in reversed(list(enumerate(block.ops))): if is_loss_grad_op(op): loss_grad_var = block.vars[op.output_arg_names[0]] @@ -173,12 +235,12 @@ class PipelineOptimizer(MetaOptimizerBase): inputs={'X': loss_grad_var}, outputs={'Out': loss_grad_var}, attrs={ - 'scale': 1.0 / self.nranks, + 'scale': 1.0 / node_num, OP_ROLE_KEY: OpRole.Backward }) def _insert_allreduce_ops(self, ring_id): - block = self.main_program_list[ring_id]['program'].global_block() + block = self.main_program_list[ring_id - 1]['program'].global_block() origin_block = self.main_program.global_block() grad = None for idx, op in reversed(list(enumerate(block.ops))): diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index ec91417a0f..838aea37f1 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -413,25 +413,17 @@ class Section(DeviceWorker): section_param = trainer_desc.section_param section_param.num_microbatches = pipeline_opt["num_microbatches"] section_param.start_cpu_core_id = pipeline_opt["start_cpu_core_id"] - for i, program in enumerate(pipeline_opt["section_program_list"]): - cfg = section_param.section_config.add() - cfg.program_desc.ParseFromString(program["program"]._get_desc() - .serialize_to_string()) - # TODO: why does not work - # cfg.program_desc.CopyFrom(program.program._get_desc()) - place = pipeline_opt["place_list"][i] - place_id = pipeline_opt["place_id_list"][i] - if isinstance(place, core.CPUPlace): - cfg.place = cfg.CPUPlace - elif isinstance(place, core.CUDAPlace): - cfg.place = cfg.CUDAPlace - elif isinstance(place, core.CUDAPinnedPlace): - cfg.place = cfg.CUDAPinnedPlace - else: - raise NotImplementedError( - "SectionWorker only supports CPUPlace, CUDAPlace and CUDAPinnedPlace now." - ) - cfg.place_id = place_id + cfg = section_param.section_config + program = pipeline_opt["section_program"] + cfg.program_desc.ParseFromString(program["program"]._get_desc() + .serialize_to_string()) + # TODO: why does not work + # cfg.program_desc.CopyFrom(program.program._get_desc()) + place = pipeline_opt["place"] + place_id = pipeline_opt["place_id"] + assert isinstance(place, core.CUDAPlace) + cfg.place = cfg.CUDAPlace + cfg.place_id = place_id class DeviceWorkerFactory(object): diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b4dfb9a914..57e44fca9c 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -561,6 +561,7 @@ class Executor(object): self._default_executor = core.Executor(p) self._closed = False self.pruned_program_scope_caches = dict() + self._prepare_to_run_called = False self._auto_checkpoint_name = unique_name.generate( "__auto_checkpoint_executor__") @@ -1115,6 +1116,24 @@ class Executor(object): use_default_main_program = program is None if program is None: program = default_main_program() + + if fetch_list is not None: + if isinstance(fetch_list, Variable) or isinstance( + fetch_list, str) or isinstance(fetch_list, + six.string_types): + fetch_list = [fetch_list] + assert isinstance(fetch_list, tuple) or isinstance(fetch_list, list), \ + "Currently , The fetch_list type only should be list or tuple, \n"\ + "but the input type is {}. For more information please refer to \n"\ + "the executor.run(...).".format(type(fetch_list)) + else: + fetch_list = [] + + if isinstance(program, Program) and program._pipeline_opt: + if "startup_program" in program._pipeline_opt: + program = program._pipeline_opt["startup_program"] + else: + return self.train_from_dataset(program, fetch_list=fetch_list) if isinstance(program, Program) and \ len(program.global_block().ops) == 0: if use_default_main_program: @@ -1131,18 +1150,6 @@ class Executor(object): if scope is None: scope = global_scope() - if fetch_list is not None: - if isinstance(fetch_list, Variable) or isinstance( - fetch_list, str) or isinstance(fetch_list, - six.string_types): - fetch_list = [fetch_list] - assert isinstance(fetch_list, tuple) or isinstance(fetch_list, list), \ - "Currently , The fetch_list type only should be list or tuple, \n"\ - "but the input type is {}. For more information please refer to \n"\ - "the executor.run(...).".format(type(fetch_list)) - else: - fetch_list = [] - # use_prune can be overrided by putting optimize_ops in fetch_list _origin_fetch_list = fetch_list _origin_program = program @@ -1449,6 +1456,25 @@ class Executor(object): raise RuntimeError("dataset is need and should be initialized") dataset._prepare_to_run() + real_fetch_list = [] + if program._pipeline_opt: + real_program = program._pipeline_opt["section_program"]['program'] + for fetch_var in fetch_list: + if isinstance(fetch_var, Variable): + fetch_var_name = fetch_var.name + else: + fetch_var_name = fetch_var + if fetch_var_name in real_program.global_block().vars: + real_fetch_list.append(fetch_var) + + program._pipeline_opt["section_program"][ + 'program'] = self._add_feed_fetch_ops( + program=program._pipeline_opt["section_program"]['program'], + feed=[], + fetch_list=real_fetch_list, + feed_var_name='feed', + fetch_var_name='fetch') + fetch_list = None scope, trainer = self._prepare_trainer( program=program, @@ -1483,6 +1509,10 @@ class Executor(object): dataset._dynamic_adjust_after_train() dataset._finish_to_run() + if real_fetch_list: + arr = scope.find_var('fetch').get_fetch_list() + tensors = arr._move_to_list() + return as_numpy(tensors) return None diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py old mode 100755 new mode 100644 index cf49268a65..7f9ade8fcb --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -3743,15 +3743,9 @@ class PipelineOptimizer(object): exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) batch_size = 1 - filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"] - dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") - dataset.set_use_var([x,y]) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) data_loader.start() exe.train_from_dataset( - fluid.default_main_program(), - dataset) + fluid.default_main_program()) data_loader.reset() """ @@ -3769,7 +3763,7 @@ class PipelineOptimizer(object): "num_microbatches must be a positive value.") self._num_microbatches = num_microbatches assert start_cpu_core_id >= 0, ( - "start_cpu_core_id must be greater than or equal to 0.") + "start_cpu_core_id must be a non-negative integer.") self._start_cpu_core_id = start_cpu_core_id self._place_list = None op_maker = core.op_proto_and_checker_maker @@ -3777,7 +3771,7 @@ class PipelineOptimizer(object): self._op_role_key = op_maker.kOpRoleAttrName() self._op_role_var_key = op_maker.kOpRoleVarAttrName() self._op_device_key = op_maker.kOpDeviceAttrName() - self._param_device_map = dict() + self._param_device_map = None def _create_vars(self, block, main_program): # Create vars for block, copied from main_program's global block @@ -3793,7 +3787,10 @@ class PipelineOptimizer(object): used_var_set.add(var) source_var = main_program.block(0).var(str(var)) if source_var.type == core.VarDesc.VarType.READER: - block.create_var(name=var, type=core.VarDesc.VarType.READER) + block.create_var( + name=var, + type=core.VarDesc.VarType.READER, + persistable=source_var.persistable) else: block._clone_variable(source_var, False) @@ -3816,28 +3813,48 @@ class PipelineOptimizer(object): return 'Param' in op.input_names and 'Grad' in op.input_names and ( "LearningRate" in op.input_names) - def _split_program(self, main_program): + def _split_program(self, main_program, devices): """ Split a program into sections according to devices that ops run on. + The ops of the role LRSched are copied to all sections. Args: main_program (Program): the main program + devices: all used devices """ programs = [] # Map from device to its corresponding section program info device_program_map = dict() - block = main_program.block(0) + for device in devices: + p = {'program': Program()} + device_program_map[device] = p + block = main_program.block(0) for op in block.ops: device = op.attr(self._op_device_key) - - if device not in device_program_map: - program = {"program": Program()} - device_program_map[device] = program - program = device_program_map[device] - op_desc = op.desc - ap_op = program["program"].block(0).desc.append_op() - ap_op.copy_from(op_desc) + op_role = op.attr(self._op_role_key) + if int(op_role) & int(self._op_role.LRSched): + # Copy ops of the role LRSched to all sections. + for device in device_program_map.keys(): + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(self._op_device_key, "") + elif op.type == "create_py_reader" or op.type == "read": + # Copy read related ops to all section to make them exit after each epoch. + for device in device_program_map.keys(): + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(self._op_device_key, "") + else: + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(self._op_device_key, "") for key in sorted(device_program_map.keys()): program = device_program_map[key] @@ -3846,6 +3863,24 @@ class PipelineOptimizer(object): return programs + def _split_startup_program(self, startup_program, local_rank): + block = startup_program.block(0) + new_startup_program = Program() + for op in block.ops: + device = op.attr(self._op_device_key) + if device: + device_index = int(device.split(":")[1]) + else: + device_index = None + if device_index is not None and device_index != local_rank: continue + op_desc = op.desc + ap_op = new_startup_program.block(0).desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(self._op_device_key, "") + new_startup_program._sync_with_cpp() + self._create_vars(new_startup_program.block(0), startup_program) + return new_startup_program + def _find_post_op(self, ops, cur_op, var_name): """ Find the real post op that has variable named var_name as input. @@ -3867,9 +3902,8 @@ class PipelineOptimizer(object): for in_var_name in op.input_arg_names: if in_var_name == var_name: post_op.append(op) + break if post_op: - if not len(post_op) == 1: - raise ValueError("Each op can only have one post op.") return post_op[0] return None @@ -3885,6 +3919,8 @@ class PipelineOptimizer(object): """ prev_op = [] for op in ops: + if op.type == 'send_v2' or op.type == 'recv_v2': + continue if op == cur_op: break for out_var_name in op.output_arg_names: @@ -3923,61 +3959,27 @@ class PipelineOptimizer(object): def _get_data_var_info(self, block): """ - Get all vars whose is_data attribute are true and then rename them. - - For PipelineTrainer, all data vars are binded to - minibatch scope, so we have to feed them to the microbatch - to avoid conflicts. The vars feeded to microbatch have to - be renamed. + Get info of all vars whose is_data attribute are true. """ - # A map from var name to the renamed name. - raw_name_new_name_map = dict() - # Because we will create vars in block, it is more safe - # to get all var_names before iteration. - var_names = list(block.vars.keys()) - for var_name in var_names: - var = block.var(var_name) - if not var.is_data: - continue - assert var_name not in raw_name_new_name_map, ( - "{} has already been processed.".format(var_name)) - new_name = unique_name.generate(var_name) - raw_name_new_name_map[var_name] = new_name - new_var = self._create_var(block, var, new_name) - new_var.is_data = False - - # map of data to devices that that data on + # map of data vars to devices that that data on data_devices_map = dict() for op in block.ops: dev_spec = op.attr(self._op_device_key) for var_name in op.input_arg_names: - if var_name not in raw_name_new_name_map: + if "blocking_queue" in var_name: continue + var = block.var(var_name) + if not var.is_data: continue if not var_name in data_devices_map: data_devices_map[var_name] = [] if not dev_spec in data_devices_map[var_name]: data_devices_map[var_name].append(dev_spec) - new_name = raw_name_new_name_map[var_name] - #self._rename_arg(op, var_name, new_name) - return data_devices_map, raw_name_new_name_map - - def _rename_var_in_block(self, block, raw_name_new_name_map): - """ - Rename vars whose names in raw_name_new_name_map to the corresponding - new names. - """ - for op in block.ops: - if op.type == "enqueue" or op.type == "dequeue": - continue - for var_name in op.input_arg_names: - if var_name in raw_name_new_name_map: - new_name = raw_name_new_name_map[var_name] - self._rename_arg(op, var_name, new_name) + return data_devices_map - def _insert_enq_deq_for_data_var(self, main_block, programs, startup, - devices): + def _insert_sendrecv_for_data_var(self, main_block, programs, startup, + devices): """ - Insert enqueue and dequeue ops for data var + Insert send and recv ops for data var that on other devices. Args: main_block (Block): Global block for main program @@ -3986,48 +3988,34 @@ class PipelineOptimizer(object): devices (list): List of devices in the format (dev:dev_index) """ main_program = main_block.program - data_devices_map, raw_name_new_name_map = self._get_data_var_info( - main_block) + data_devices_map = self._get_data_var_info(main_block) first_prog = programs[0]['program'] first_block = first_prog.block(0) - enqueue_index = 0 - if first_block.ops[0].type == "create_py_reader" or ( - first_block.ops[1].type == "create_py_reader"): - for op in first_block.ops: - if op.type == "read": - enqueue_index += 1 - break - enqueue_index += 1 + insert_index = 0 + for op in first_block.ops: + insert_index += 1 + if op.type == "read": + break first_dev_spec = devices[0] + first_dev_index = int(first_dev_spec.split(':')[1]) for var_name in data_devices_map.keys(): for device in data_devices_map[var_name]: - # step1: generate queue for each pair of data var and device - # that that data on - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup.block(0).create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup.block(0).append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) + if device == first_dev_spec: continue main_var = main_block.var(var_name) assert main_var.is_data if not var_name in first_block.vars: self._create_var(first_block, main_var, var_name) + dev_index = int(device.split(':')[1]) first_block._insert_op( - index=enqueue_index, - type='enqueue', + index=insert_index, + type='send_v2', inputs={'X': first_block.var(var_name)}, attrs={ - 'queue_name': queue_name, self._op_device_key: first_dev_spec, - self._op_role_key: self._op_role.Forward + self._op_role_key: self._op_role.Forward, + 'use_calc_stream': True, + 'peer': dev_index, }) # Get the device that that data on assert device in devices @@ -4035,21 +4023,24 @@ class PipelineOptimizer(object): prog = programs[prog_index]['program'] block = prog.block(0) index = 0 - if device == first_dev_spec: - index = enqueue_index + 1 - new_name = raw_name_new_name_map[var_name] + for op in block.ops: + index += 1 + if op.type == "read": + break source_var = main_program.block(0).var(var_name) - new_var = self._create_var(block, source_var, new_name) + new_var = self._create_var(block, source_var, var_name) block._insert_op( index=index, - type='dequeue', + type='recv_v2', outputs={'Out': [new_var]}, attrs={ + 'out_shape': new_var.shape, + 'dtype': new_var.dtype, self._op_device_key: device, self._op_role_key: self._op_role.Forward, - 'queue_name': queue_name, + 'peer': first_dev_index, + 'use_calc_stream': True, }) - self._rename_var_in_block(block, raw_name_new_name_map) def _strip_grad_suffix(self, name): """ @@ -4064,18 +4055,6 @@ class PipelineOptimizer(object): """ return name + core.grad_var_suffix() - def _update_param_device_map(self, params_grads, block): - for param_grad in params_grads: - if not param_grad[0].trainable: continue - param_name = param_grad[0].name - ops = block.ops - for op in ops: - input_arg_names = op.input_arg_names - if param_name in input_arg_names: - self._param_device_map[param_name] = op.attr( - self._op_device_key) - break - def _add_opdevice_attr_for_regularization_clip(self, block): """ Add op_device attribute for regulization and clip ops. @@ -4090,7 +4069,7 @@ class PipelineOptimizer(object): assert self._op_role_var_key in op.attr_names op_role_var = op.all_attrs()[self._op_role_var_key] assert len(op_role_var) == 2 - param_name = block.vars[op_role_var[0]].name + param_name = op_role_var[0] device = self._param_device_map[param_name] op._set_attr(self._op_device_key, device) @@ -4159,32 +4138,37 @@ class PipelineOptimizer(object): "{} has not been set.".format(op.type)) if not dev_spec in device_specs: device_specs.append(dev_spec) + sorted_device_specs = sorted(device_specs) + assert sorted_device_specs == device_specs return device_specs - def _insert_enq_deq_ops_for_boundaries(self, block, origin_block, - startup_program): + def _insert_sendrecv_ops_for_boundaries(self, block): """ - Insert a pair of enqueue and dequeue ops for every two + Insert a pair of send and recv ops for every two consecutive ops on different devices. """ - startup_block = startup_program.global_block() extra_index = 0 # A map from var to device spec where op takes it as input, - # avoiding multiple enqueue and dequeue ops. + # avoiding multiple send and recv ops. var_devspec = dict() - for index, op in list(enumerate(origin_block.ops)): + for index, op in enumerate(list(block.ops)): + # skips lr-related ops and vars, as we will process them later. + if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): + continue + # skips update ops and vars, as we will process them later. + if self._is_update_op(op): continue + cur_device_spec = op.attr(self._op_device_key) for var_name in op.input_arg_names: # i.e., lod_tensor_blocking_queue created by DataLoader, # which only exists in startup program. - if not var_name in origin_block.vars: continue + if not var_name in block.vars: continue var = block.var(var_name) # skip data, because we will process it later if var.is_data: continue - prev_op = self._find_real_prev_op(origin_block.ops, op, - var_name) + prev_op = self._find_real_prev_op(block.ops, op, var_name) if prev_op is None: continue prev_device_spec = prev_op.attr(self._op_device_key) @@ -4195,118 +4179,64 @@ class PipelineOptimizer(object): if cur_device_spec in var_devspec[var_name]: continue var_devspec[var_name].append(cur_device_spec) - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup_block.create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup_block.append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) op_role = op.all_attrs()[self._op_role_key] var = block.vars[var_name] + prev_device_index = int(prev_device_spec.split(':')[1]) + cur_device_index = int(cur_device_spec.split(':')[1]) block._insert_op( index=index + extra_index, - type='enqueue', + type='send_v2', inputs={'X': var}, attrs={ - 'queue_name': queue_name, self._op_device_key: prev_device_spec, - self._op_role_key: op_role + self._op_role_key: op_role, + 'use_calc_stream': True, + 'peer': cur_device_index, }) extra_index += 1 block._insert_op( index=index + extra_index, - type='dequeue', + type='recv_v2', outputs={'Out': [var]}, attrs={ + 'out_shape': var.shape, + 'dtype': var.dtype, self._op_device_key: cur_device_spec, - 'queue_name': queue_name, - self._op_role_key: op_role + self._op_role_key: op_role, + 'use_calc_stream': True, + 'peer': prev_device_index, }) extra_index += 1 - def _add_dequeue_ops_for_optimize(self, block, startup_program): - startup_block = startup_program.global_block() - grad_queue_map = dict() - grad_device_map = dict() - optimize_index = None - grad_names_to_dequeue = [] - - for index, op in reversed(list(enumerate(block.ops))): - device = op.attr(self._op_device_key) - # Optimizer pass - if not self._is_optimize_op(op): - optimize_index = index + 1 - break - if not self._is_update_op(op): continue - assert self._op_role_var_key in op.attr_names - op_role_var = op.all_attrs()[self._op_role_var_key] - assert len(op_role_var) == 2 - grad_name = op_role_var[1] - assert grad_name not in grad_device_map - assert grad_name not in grad_names_to_dequeue - grad_device_map[grad_name] = device - grad_names_to_dequeue.append(grad_name) - - for grad_name in grad_names_to_dequeue: - device = grad_device_map[grad_name] - grad_names = [] - grads = [] - queue_name = grad_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - grad_queue_map[grad_name] = queue_name - ref_var = block.vars[grad_name] - queue_var = startup_block.create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup_block.append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) - orig_var_name = self._strip_grad_suffix(grad_name) - for _ in range(self._num_microbatches): - u_name = unique_name.generate(orig_var_name) - u_grad_name = self._append_grad_suffix(u_name) - grad_var = self._create_var(block, ref_var, u_grad_name) - grad_names.append(u_grad_name) - grads.append(grad_var) - block._insert_op( - index=optimize_index, - type='dequeue', - outputs={'Out': grads}, - attrs={ - self._op_device_key: device, - 'queue_name': queue_name, - self._op_role_key: self._op_role.Optimize - }) - block._insert_op( - index=optimize_index + 1, - type='sum', - inputs={'X': grad_names}, - outputs={'Out': ref_var}, + def _clear_gradients(self, main_block, dev_spec): + """ + Clear gradients at the begining of each run of a minibatch. + """ + for param_name in self._param_device_map: + device = self._param_device_map[param_name] + if device != dev_spec: continue + grad_name = self._append_grad_suffix(param_name) + grad_var = main_block.vars[grad_name] + main_block._insert_op( + index=0, + type='fill_constant', + inputs={}, + outputs={'Out': [grad_var]}, attrs={ + 'shape': grad_var.shape, + 'dtype': grad_var.dtype, + 'value': float(0), self._op_device_key: device, - self._op_role_key: self._op_role.Optimize + # a trick to run this op once per mini-batch + self._op_role_key: self._op_role.Optimize.LRSched, }) - return grad_queue_map - def _insert_enq_deq_ops_for_update(self, block, startup_program): + def _accumulate_gradients(self, block): """ - Insert enqueue and dequeue ops for gradients of parameters. + Accumulate the gradients generated in microbatch to the one in mini-batch. + We also scale the loss corresponding to number of micro-batches as well. """ - startup_block = startup_program.global_block() - grad_queue_map = self._add_dequeue_ops_for_optimize(block, - startup_program) - - for index, op in reversed(list(enumerate(block.ops))): + for index, op in reversed(tuple(enumerate(list(block.ops)))): offset = index device = op.attr(self._op_device_key) @@ -4332,19 +4262,23 @@ class PipelineOptimizer(object): if len(op_role_var) == 0: continue assert len(op_role_var) % 2 == 0 + offset = index for i in range(0, len(op_role_var), 2): grad_name = op_role_var[i + 1] grad_var = block.vars[grad_name] - assert grad_name in grad_queue_map - queue_name = grad_queue_map[grad_name] + new_grad_var_name = unique_name.generate(grad_name) + new_var = self._create_var(block, grad_var, + new_grad_var_name) + self._rename_arg(op, grad_name, new_grad_var_name) block._insert_op( index=offset + 1, - type='enqueue', - inputs={'X': block.vars[grad_name]}, + type='sum', + inputs={'X': [grad_var, new_var]}, + outputs={'Out': grad_var}, attrs={ - 'queue_name': queue_name, self._op_device_key: device, - self._op_role_key: self._op_role.Backward + self._op_role_key: self._op_role.Backward, + self._op_role_var_key: op_role_var }) offset += 1 @@ -4401,7 +4335,9 @@ class PipelineOptimizer(object): for prog in var_info[var_name]: block = prog.block(0) for op in block.ops: - if op.type == "dequeue": continue + if op.type == "recv_v2" or op.type == "create_py_reader" or \ + op.type == "read": + continue # We have processed lr related vars if op.attr(self._op_role_key) == int( self._op_role.Optimize.LRSched): @@ -4421,45 +4357,39 @@ class PipelineOptimizer(object): write_prog = write_info[var_name] write_block = write_prog.block(0) write_device = self._get_device_info(write_block) + write_dev_index = int(write_device.split(':')[1]) all_progs = var_info[var_name] for prog in all_progs: if prog == write_prog: continue + read_block = prog.block(0) + read_device = self._get_device_info(read_block) + read_dev_index = int(read_device.split(':')[1]) - queue_name = var_name + "_blocking_queue" - queue_name = unique_name.generate(queue_name) - queue_var = startup_prog.block(0).create_var( - name=queue_name, - persistable=True, - type=core.VarDesc.VarType.RAW) - startup_prog.block(0).append_op( - type='queue_generator', - attrs={ - 'names': [queue_name], - 'capacity': self._num_microbatches - }) write_block._insert_op( index=0, - type='enqueue', + type='send_v2', inputs={'X': write_block.var(var_name), }, attrs={ - 'queue_name': queue_name, self._op_device_key: write_device, + 'use_calc_stream': True, # A trick to make the role LRSched to avoid copy every # microbatch - self._op_role_key: self._op_role.LRSched + self._op_role_key: self._op_role.LRSched, + 'peer': read_dev_index, }) - read_block = prog.block(0) - read_device = self._get_device_info(read_block) read_block._insert_op( index=0, - type='dequeue', + type='recv_v2', outputs={'Out': [read_block.var(var_name)]}, attrs={ + 'out_shape': read_block.var(var_name).shape, + 'dtype': read_block.var(var_name).dtype, self._op_device_key: read_device, + 'use_calc_stream': True, # A trick to make the role LRSched to avoid copy every # microbatch self._op_role_key: self._op_role.LRSched, - 'queue_name': queue_name, + 'peer': write_dev_index }) def minimize(self, @@ -4472,26 +4402,21 @@ class PipelineOptimizer(object): startup_program = default_startup_program() optimize_ops, params_grads = self._optimizer.minimize( loss, startup_program, parameter_list, no_grad_set) - self._update_param_device_map(params_grads, main_block) + self._param_device_map = self._optimizer._param_device_map # Step1: add default op_device attribute for regulization and clip ops self._add_opdevice_attr_for_regularization_clip(main_block) # Step2: add default op_device attribute for ops whose op_device - # attribute have not been set yet. + # attribute have not been set yet. Then check all ops have the + # op_device attribute. self._add_default_opdevice_attr(main_block) - device_specs = self._check_validation(main_block) - # Step3: add enqueue and dequeue ops between section boundaries - origin_prog = main_block.program.clone(for_test=False) - origin_main_block = origin_prog.global_block() - self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block, - startup_program) + device_specs = self._check_validation(main_block) + assert len(device_specs) > 1 - # Step4: add a pair of enqueue and dequeueN for parameter gradients - self._insert_enq_deq_ops_for_update(main_block, startup_program) - - main_program = main_block.program + # Step3: add send and recv ops between section boundaries + self._insert_sendrecv_ops_for_boundaries(main_block) place_list = [] place_id_list = [] @@ -4506,37 +4431,56 @@ class PipelineOptimizer(object): else: raise ValueError("Unknown device type: %s", dev_spec) - # Step5: split program into sections and add pairs of - # enqueue and dequeue ops for data var. - if len(place_list) == 0: - program_list = [] - ptmp = { - "program": main_program, - "input_set": set(), - "output_set": set() - } - program_list.append(ptmp) - else: - program_list = self._split_program(main_program) - for p in program_list: - self._create_vars(p["program"].block(0), main_program) - self._insert_enq_deq_for_data_var(main_block, program_list, - startup_program, device_specs) + # Step4: split program into sections and add pairs of + # send and recv ops for data var. + main_program = main_block.program + program_list = self._split_program(main_program, device_specs) + for p in program_list: + self._create_vars(p["program"].block(0), main_program) + self._insert_sendrecv_for_data_var(main_block, program_list, + startup_program, device_specs) - # Step6: Special Case: process persistable vars that exist in + # Step5: Special Case: process persistable vars that exist in # multiple sections self._process_persistable_vars_in_multi_sections( main_program, startup_program, program_list) - # Step7: Add sub blocks for section programs + # Step6: Add sub blocks for section programs self._add_sub_blocks(main_block, program_list) + assert (main_program._pipeline_opt and + isinstance(main_program._pipeline_opt, dict) and + 'local_rank' in main_program._pipeline_opt), \ + "You must use pipeline with fleet" + local_rank = main_program._pipeline_opt['local_rank'] + + # Step7: Split startup program + new_startup_program = self._split_startup_program(startup_program, + local_rank) + + # Step8: clear gradients before each mini-batch and + # accumulate gradients during backward + self._clear_gradients( + program_list[local_rank]['program'].global_block(), + dev_spec=device_specs[local_rank]) + self._accumulate_gradients(program_list[local_rank]['program'] + .global_block()) + + with open("startup_prog_%d" % local_rank, 'w') as f: + f.writelines(str(new_startup_program)) + with open("main_prog_%d" % local_rank, 'w') as f: + f.writelines(str(program_list[local_rank]['program'])) + + startup_program._pipeline_opt = { + "startup_program": new_startup_program, + } main_program._pipeline_opt = { "trainer": "PipelineTrainer", "device_worker": "Section", - "section_program_list": program_list, - "place_list": place_list, - "place_id_list": place_id_list, + "inner_parallelism": len(device_specs), + "section_program": program_list[local_rank], + "place": place_list[local_rank], + "place_id": place_id_list[local_rank], "sync_steps": -1, "num_microbatches": self._num_microbatches, "start_cpu_core_id": self._start_cpu_core_id, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index b0205aebde..b76fe08b08 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -10,10 +10,12 @@ if(NOT WITH_NCCL) endif() string(REPLACE ".py" "" DIST_TEST_OPS "${DIST_TEST_OPS}") list(APPEND DIST_TEST_OPS test_parallel_dygraph_mnist) +list(APPEND DIST_TEST_OPS test_pipeline) list(APPEND DIST_TEST_OPS test_parallel_dygraph_se_resnext) list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding) list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding_over_height) list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer) +list(APPEND DIST_TEST_OPS test_fleet_pipeline_meta_optimizer) list(APPEND DIST_TEST_OPS test_listen_and_serv_op) list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) @@ -146,7 +148,6 @@ if (WITH_NCCL) endif() if(NOT WITH_GPU OR WIN32) - LIST(REMOVE_ITEM TEST_OPS test_pipeline) LIST(REMOVE_ITEM TEST_OPS test_boxps) endif() list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290 @@ -469,7 +470,6 @@ if(WITH_DISTRIBUTE) py_test_modules(test_fleet_sharding_meta_optimizer MODULES test_fleet_sharding_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_fp16_allreduce_meta_optimizer MODULES test_fleet_fp16_allreduce_meta_optimizer ENVS ${dist_ENVS}) - py_test_modules(test_fleet_pipeline_meta_optimizer MODULES test_fleet_pipeline_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_private_function MODULES test_fleet_private_function ENVS ${dist_ENVS}) py_test_modules(test_fleet_meta_optimizer_base MODULES test_fleet_meta_optimizer_base ENVS ${dist_ENVS}) py_test_modules(test_fleet_distributed_strategy MODULES test_fleet_distributed_strategy) diff --git a/python/paddle/fluid/tests/unittests/pipeline_mnist.py b/python/paddle/fluid/tests/unittests/pipeline_mnist.py new file mode 100644 index 0000000000..8987646b3e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/pipeline_mnist.py @@ -0,0 +1,136 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import time +import math + +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +from paddle.fluid import core +import unittest +from multiprocessing import Process +import os +import signal +from functools import reduce +from test_dist_base import TestDistRunnerBase, runtime_main +import paddle.distributed.fleet as fleet + +paddle.enable_static() + +DTYPE = "float32" +paddle.dataset.mnist.fetch() + +# Fix seed for test +fluid.default_startup_program().random_seed = 1 +fluid.default_main_program().random_seed = 1 + + +def cnn_model(data): + conv_pool_1 = fluid.nets.simple_img_conv_pool( + input=data, + filter_size=5, + num_filters=20, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.01))) + conv_pool_2 = fluid.nets.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + pool_size=2, + pool_stride=2, + act="relu", + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.01))) + + SIZE = 10 + input_shape = conv_pool_2.shape + param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE] + scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5 + + predict = fluid.layers.fc( + input=conv_pool_2, + size=SIZE, + act="softmax", + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Constant(value=0.01))) + return predict + + +class TestDistMnist2x2(TestDistRunnerBase): + def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None): + # Input data + with fluid.device_guard("gpu:0"): + images = fluid.layers.data( + name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + if dist_strategy: + data_loader = fluid.io.DataLoader.from_generator( + feed_list=[images, label], + capacity=64, + use_double_buffer=False, + iterable=False) + # Train program + predict = cnn_model(images) + with fluid.device_guard("gpu:1"): + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + # Evaluator + with fluid.device_guard("gpu:1"): + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + + inference_program = fluid.default_main_program().clone() + base_lr = self.lr + passes = [30, 60, 80, 90] + steps_per_pass = 10 + bd = [steps_per_pass * p for p in passes] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) + opt = fluid.optimizer.Momentum(learning_rate=lr_val, momentum=0.9) + + # Reader + train_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=batch_size) + + if dist_strategy: + fleet.init(is_collective=True) + strategy = fleet.DistributedStrategy() + strategy.pipeline = True + dist_opt = fleet.distributed_optimizer( + optimizer=opt, strategy=strategy) + dist_opt.minimize(avg_cost) + else: + opt.minimize(avg_cost) + + if dist_strategy: + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader + else: + return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict + + +if __name__ == "__main__": + runtime_main(TestDistMnist2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 10e154044f..19d9031573 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -124,6 +124,67 @@ class TestDistRunnerBase(object): exe.run(pserver_prog) print_to_err(type(self).__name__, "run pserver main program done.") + def run_pipeline_trainer(self, args): + self.lr = args.lr + + dist_strategy = DistributedStrategy() + test_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader = \ + self.get_model(batch_size=args.batch_size, dist_strategy=dist_strategy) + + device_id = int(os.getenv("FLAGS_selected_gpus", "0")) + eprint(type(self).__name__, "device_id: %d." % device_id) + place = fluid.CUDAPlace(device_id) + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + eprint(type(self).__name__, "run worker startup program done.") + + data_loader.set_sample_list_generator(train_reader, place) + data_loader.start() + print_to_err(type(self).__name__, "begin to train on trainer") + out_losses = [] + for i in six.moves.xrange(RUN_STEP): + loss = exe.run(fluid.default_main_program(), fetch_list=[avg_cost]) + loss = loss[0] if loss else None + out_losses.append(loss) + print_to_err(type(self).__name__, "run step %d finished" % i) + print_to_err(type(self).__name__, "trainer run finished") + + if six.PY2: + print(pickle.dumps(out_losses)) + else: + sys.stdout.buffer.write(pickle.dumps(out_losses)) + + if args.save_model: + model_save_dir = "/tmp" + if fleet.worker_index() == 0: + model_save_dir_fluid = os.path.join(model_save_dir, + "fluid_persistables") + model_save_dir_fleet = os.path.join(model_save_dir, + "fleet_persistables") + infer_save_dir_fluid = os.path.join(model_save_dir, + "fluid_infer") + infer_save_dir_fleet = os.path.join(model_save_dir, + "fleet_infer") + else: + model_save_dir_fluid = os.path.join(model_save_dir, + "fluid_persistables_2") + model_save_dir_fleet = os.path.join(model_save_dir, + "fleet_persistables_2") + infer_save_dir_fluid = os.path.join(model_save_dir, + "fluid_infer_2") + infer_save_dir_fleet = os.path.join(model_save_dir, + "fleet_infer_2") + fluid.io.save_persistables(exe, model_save_dir_fluid, + fleet._origin_program) + fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet) + feeded_var_names = [var.name for var in feed_var_list] + fluid.io.save_inference_model(infer_save_dir_fluid, + feeded_var_names, [avg_cost], exe, + fleet._origin_program) + fleet.save_inference_model(exe, infer_save_dir_fleet, + feeded_var_names, [avg_cost]) + def run_gpu_fleet_api_trainer(self, args): assert args.update_method == "nccl2" @@ -532,6 +593,7 @@ def runtime_main(test_class): parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) parser.add_argument('--enable_backward_deps', action='store_true') parser.add_argument('--use_hallreduce', action='store_true') + parser.add_argument('--use_pipeline', action='store_true') parser.add_argument('--gpu_fleet_api', action='store_true') parser.add_argument('--use_local_sgd', action='store_true') parser.add_argument('--ut4grad_allreduce', action='store_true') @@ -566,6 +628,8 @@ def runtime_main(test_class): model.run_pserver(args) elif args.gpu_fleet_api: model.run_gpu_fleet_api_trainer(args) + elif args.use_pipeline: + model.run_pipeline_trainer(args) else: model.run_trainer(args) @@ -607,6 +671,7 @@ class TestDistBase(unittest.TestCase): self._dc_asgd = False # must use with async mode self._use_reader_alloc = True self._nccl2_mode = False + self._pipeline_mode = False self._mp_mode = False # FIXME(typhoonzero): I added this stupid argument to enable # testing allreduce layers, which users can call layers.allreduce @@ -892,6 +957,8 @@ class TestDistBase(unittest.TestCase): if self._use_dgc: tr_cmd += " --use_dgc" + if self._pipeline_mode: + tr_cmd += " --use_pipeline" if self._mp_mode: env = {"FLAGS_selected_gpus": "{}".format(trainer_id % 2)} @@ -978,6 +1045,51 @@ class TestDistBase(unittest.TestCase): print("outs[1]:", outs[1]) return pickle.loads(outs[0]), pickle.loads(outs[1]) + def _run_pipeline(self, model, envs, check_error_log, log_name): + # NOTE: we reuse ps_endpoints as nccl2 worker endpoints + worker_endpoints = self._ps_endpoints.split(",") + update_method = "nccl2" + + trainer_num = len(worker_endpoints) + + procs = [] + pipes = [] + for i in range(0, trainer_num): + tr_cmd, tr_env = self._get_nccl2_trainer_cmd( + model, worker_endpoints[i], update_method, i, trainer_num) + tr_env.update(envs) + tr_env['CUDA_VISIBLE_DEVICES'] = "0,1" + tr_env['NCCL_SHM_DISABLE'] = '1' + tr_env['FLAGS_selected_gpus'] = str(i) + tr_env['FLAGS_cudnn_deterministic'] = '0' + print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env)) + + tr_pipe = open("/tmp/" + "tr{}_err.log".format(i), "wb") + + print_to_err( + type(self).__name__, + "going to start process {} with nccl2".format(i)) + tr_proc = subprocess.Popen( + tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) + + procs.append(tr_proc) + pipes.append(tr_pipe) + + outs = [] + for i in range(0, trainer_num): + tr_out, tr_err = procs[i].communicate() + outs.append(tr_out) + pipes[i].close() + sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err)) + + if check_error_log: + print("outs[0]:", outs[0]) + print("outs[1]:", outs[1]) + return pickle.loads(outs[0]), pickle.loads(outs[1]) + def _get_required_envs(self, check_error_log=False, need_envs={}): # TODO(typhoonzero): should auto adapt GPU count on the machine. required_envs = { @@ -1032,6 +1144,9 @@ class TestDistBase(unittest.TestCase): False, check_error_log, log_name=log_name) + elif self._pipeline_mode: + tr0_losses, tr1_losses = self._run_pipeline( + model_file, required_envs, check_error_log, log_name=log_name) else: tr0_losses, tr1_losses = self._run_cluster( model_file, required_envs, check_error_log, log_name=log_name) @@ -1040,7 +1155,10 @@ class TestDistBase(unittest.TestCase): local_loss = local_losses[step_id] tr0_loss = tr0_losses[step_id] tr1_loss = tr1_losses[step_id] - dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2 + if self._pipeline_mode: + dist_loss = np.array([tr1_loss]) + else: + dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2 print("=======", local_loss, ":", dist_loss[0], "=======") self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py index adbb1268c6..d1abc83568 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_pipeline_meta_optimizer.py @@ -16,6 +16,8 @@ import unittest import paddle import os +paddle.enable_static() + class TestFleetMetaOptimizer(unittest.TestCase): def setUp(self): @@ -28,19 +30,14 @@ class TestFleetMetaOptimizer(unittest.TestCase): import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - with paddle.fluid.device_guard("cpu"): + with paddle.fluid.device_guard("gpu:0"): input_x = paddle.fluid.layers.data( name="x", shape=[32], dtype='float32') input_y = paddle.fluid.layers.data( name="y", shape=[1], dtype='int64') - data_loader = paddle.fluid.io.DataLoader.from_generator( - feed_list=[input_x, input_y], - capacity=64, - use_double_buffer=True, - iterable=False) fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') - with paddle.fluid.device_guard("gpu:0"): + with paddle.fluid.device_guard("gpu:1"): fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, diff --git a/python/paddle/fluid/tests/unittests/test_pipeline.py b/python/paddle/fluid/tests/unittests/test_pipeline.py index dd1cf29eff..2cedf8659b 100644 --- a/python/paddle/fluid/tests/unittests/test_pipeline.py +++ b/python/paddle/fluid/tests/unittests/test_pipeline.py @@ -13,212 +13,32 @@ # limitations under the License. from __future__ import print_function -import paddle -import paddle.fluid as fluid -import paddle.fluid.layers as layers -import numpy as np -import os -import shutil import unittest -import math - - -def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1, - act=None): - conv = fluid.layers.conv2d( - input=input, - num_filters=num_filters, - filter_size=filter_size, - stride=stride, - padding=(filter_size - 1) // 2, - groups=groups, - act=None, - bias_attr=False) - return fluid.layers.batch_norm( - input=conv, - act=act, ) - - -def shortcut(input, ch_out, stride, is_first): - ch_in = input.shape[1] - if ch_in != ch_out or stride != 1 or is_first == True: - return conv_bn_layer(input, ch_out, 1, stride) - else: - return input - - -def bottleneck_block(input, num_filters, stride): - conv0 = conv_bn_layer( - input=input, num_filters=num_filters, filter_size=1, act='relu') - conv1 = conv_bn_layer( - input=conv0, - num_filters=num_filters, - filter_size=3, - stride=stride, - act='relu') - conv2 = conv_bn_layer( - input=conv1, num_filters=num_filters * 4, filter_size=1, act=None) - - short = shortcut(input, num_filters * 4, stride, is_first=False) - - return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') - - -def basic_block(input, num_filters, stride, is_first): - conv0 = conv_bn_layer( - input=input, - num_filters=num_filters, - filter_size=3, - act='relu', - stride=stride) - conv1 = conv_bn_layer( - input=conv0, num_filters=num_filters, filter_size=3, act=None) - short = shortcut(input, num_filters, stride, is_first) - return fluid.layers.elementwise_add(x=short, y=conv1, act='relu') - +from test_dist_base import TestDistBase -def build_network(input, layers=50, class_dim=1000): - supported_layers = [18, 34, 50, 101, 152] - assert layers in supported_layers - depth = None - if layers == 18: - depth = [2, 2, 2, 2] - elif layers == 34 or layers == 50: - depth = [3, 4, 6, 3] - elif layers == 101: - depth = [3, 4, 23, 3] - elif layers == 152: - depth = [3, 8, 36, 3] - num_filters = [64, 128, 256, 512] - with fluid.device_guard("cpu"): - conv = conv_bn_layer( - input=input, num_filters=64, filter_size=7, stride=2, act='relu') - conv = fluid.layers.pool2d( - input=conv, - pool_size=3, - pool_stride=2, - pool_padding=1, - pool_type='max') - if layers >= 50: - for block in range(len(depth)): - with fluid.device_guard("gpu:0"): - for i in range(depth[block]): - conv = bottleneck_block( - input=conv, - num_filters=num_filters[block], - stride=2 if i == 0 and block != 0 else 1) - - with fluid.device_guard("gpu:0"): - pool = fluid.layers.pool2d( - input=conv, pool_size=7, pool_type='avg', global_pooling=True) - stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) - out = fluid.layers.fc( - input=pool, - size=class_dim, - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.Uniform(-stdv, stdv))) - else: - for block in range(len(depth)): - with fluid.device_guard("gpu:0"): - for i in range(depth[block]): - conv = basic_block( - input=conv, - num_filters=num_filters[block], - stride=2 if i == 0 and block != 0 else 1, - is_first=block == i == 0) - with fluid.device_guard("gpu:0"): - pool = fluid.layers.pool2d( - input=conv, pool_size=7, pool_type='avg', global_pooling=True) - stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) - out = fluid.layers.fc( - input=pool, - size=class_dim, - param_attr=fluid.param_attr.ParamAttr( - initializer=fluid.initializer.Uniform(-stdv, stdv))) - return out - - -class TestPipeline(unittest.TestCase): - """ TestCases for Pipeline Training. """ - - def _run(self, debug): - main_prog = fluid.Program() - startup_prog = fluid.Program() - with fluid.program_guard(main_prog, startup_prog): - with fluid.device_guard("cpu"): - image = fluid.layers.data( - name="image", shape=[3, 224, 224], dtype="float32") - label = fluid.layers.data( - name="label", shape=[1], dtype="int64") - data_loader = fluid.io.DataLoader.from_generator( - feed_list=[image, label], - capacity=64, - use_double_buffer=True, - iterable=False) - fc = build_network(image, layers=50) - with fluid.device_guard("gpu:0"): - out, prob = fluid.layers.softmax_with_cross_entropy( - logits=fc, label=label, return_softmax=True) - loss = fluid.layers.mean(out) - acc_top1 = fluid.layers.accuracy(input=prob, label=label, k=1) - acc_top5 = fluid.layers.accuracy(input=prob, label=label, k=5) - - base_lr = 0.1 - passes = [30, 60, 80, 90] - total_images = 1281167 - steps_per_pass = total_images // 128 - bd = [steps_per_pass * p for p in passes] - lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] - lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) - optimizer = fluid.optimizer.MomentumOptimizer( - lr_val, - momentum=0.9, - regularization=fluid.regularizer.L2Decay(1e-4)) - optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, num_microbatches=2) - optimizer.minimize(loss) - - def train_reader(): - for _ in range(4): - img = np.random.random(size=[3, 224, 224]).astype('float32') - label = np.random.random(size=[1]).astype('int64') - yield img, label - - data_loader.set_sample_generator(train_reader, batch_size=1) - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(startup_prog) - data_loader.start() - exe.train_from_dataset(main_prog, debug=debug) - - def test_pipeline(self): - self._run(False) - self._run(True) - - def test_pipeline_noneoptimizer(self): - with fluid.device_guard("gpu:0"): - x = fluid.layers.data( - name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data( - name='y', shape=[1], dtype='int64', lod_level=0) - emb_x = layers.embedding( - input=x, - param_attr=fluid.ParamAttr(name="embx"), - size=[10, 2], - is_sparse=False) - - fc = layers.fc(input=emb_x, - name="fc", - size=1, - num_flatten_dims=1, - bias_attr=False) - loss = layers.reduce_mean(fc) +import os +import paddle - optimizer = fluid.optimizer.SGD(learning_rate=0.5) - with self.assertRaises(ValueError): - optimizer = fluid.optimizer.PipelineOptimizer( - dict(), num_microbatches=2) +paddle.enable_static() +flag_name = os.path.splitext(__file__)[0] + + +class TestPipeline(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._pipeline_mode = True + self._nccl_comm_num = 1 + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "pipeline_mnist.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) if __name__ == '__main__': -- GitLab