未验证 提交 f77a78cd 编写于 作者: L lilong12 提交者: GitHub

enable pipeline to run with Executor.run() (#28373)

* update, test=develop
上级 9f642ed8
......@@ -540,7 +540,7 @@ class HeterBoxWorker : public HogwildWorker {
#if defined(PADDLE_WITH_NCCL)
class SectionWorker : public DeviceWorker {
public:
SectionWorker() { local_batch_id_ = 0; }
SectionWorker() {}
~SectionWorker() override {}
void Initialize(const TrainerDesc& desc) override;
......@@ -549,13 +549,12 @@ class SectionWorker : public DeviceWorker {
void CreateDeviceResource(const ProgramDesc& main_prog) override{};
void TrainFiles() override;
void TrainFilesWithProfiler() override;
void TrainFilesWithProfiler() override{};
void PrintFetchVars() override {}
const platform::Place& place() const { return place_; }
void SetSectionIndex(int section_id) { section_id_ = section_id; }
void SetDeviceIndex(int tid) override {}
void SetThreadIndex(int thread_id) { thread_id_ = thread_id; }
void SetMicrobatchNum(int num) { num_microbatches_ = num; }
......@@ -566,13 +565,8 @@ class SectionWorker : public DeviceWorker {
void SetSkipVars(const std::vector<std::string>& skip_vars) {
skip_vars_ = skip_vars;
}
static void ResetBatchId() { batch_id_ = 0; }
static void ResetThreadCompletedFlag() { threads_completed = false; }
static std::atomic<int> cpu_id_;
protected:
void AutoSetCPUAffinity(bool reuse);
int section_id_;
int thread_id_;
int num_microbatches_;
......@@ -581,12 +575,8 @@ class SectionWorker : public DeviceWorker {
const Scope* minibatch_scope_;
std::vector<std::unique_ptr<OperatorBase>> ops_;
static std::mutex thread_mutex;
static std::condition_variable thread_condition;
static bool threads_completed;
std::shared_ptr<framework::ProgramDesc> program_;
static uint64_t batch_id_;
uint64_t local_batch_id_;
platform::DeviceContext* dev_ctx_ = nullptr;
};
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#if defined(PADDLE_WITH_NCCL)
#include <map>
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/trainer.h"
......@@ -26,83 +27,25 @@ void PipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
const auto& section_params = trainer_desc.section_param();
num_microbatches_ = section_params.num_microbatches();
VLOG(3) << "Number of microbatches per minibatch: " << num_microbatches_;
section_num_ = section_params.section_config_size();
VLOG(3) << "Number of program sections: " << section_num_;
trainer_desc_ = trainer_desc;
start_cpu_core_id_ = section_params.start_cpu_core_id();
SetDataset(dataset);
ParseDumpConfig(trainer_desc);
// get filelist from trainer_desc here
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
VLOG(3) << "readers num: " << readers.size();
int num_readers = readers.size();
PADDLE_ENFORCE_EQ(num_readers, 1,
platform::errors::InvalidArgument(
"Number of dataset readers for pipeline "
"must be 1 now, but the value you give is %d.",
num_readers));
auto* reader = readers[0];
feed_var_names_ = reader->GetUseSlotAlias();
workers_.resize(section_num_);
for (int i = 0; i < section_num_; ++i) {
const auto& section_config = section_params.section_config(i);
platform::Place place;
const auto& section_config = section_params.section_config();
int place_id = section_config.place_id();
switch (section_config.place()) {
case SectionConfig::CPUPlace:
place = platform::CPUPlace();
break;
case SectionConfig::CUDAPlace:
// Note that one section has at most one GPU place in one pipeline
PADDLE_ENFORCE_GE(
place_id, 0,
platform::errors::InvalidArgument(
"The place_id value for CUDAPlace shoud be greater "
"than or equal to 0, but the value you give is %d.",
place_id));
place = platform::CUDAPlace(place_id);
break;
case SectionConfig::CUDAPinnedPlace:
place = platform::CUDAPinnedPlace();
break;
default:
PADDLE_ENFORCE_NOT_NULL(nullptr,
platform::errors::InvalidArgument(
"Unkown place type in SectionConfig: %d",
section_config.place()));
}
places_.emplace_back(place);
VLOG(3) << "Device worker place: " << place << ", device id: " << place_id
<< ", section: " << i;
workers_[i] = DeviceWorkerFactory::CreateDeviceWorker(
place_ = platform::CUDAPlace(place_id);
worker_ = DeviceWorkerFactory::CreateDeviceWorker(
trainer_desc.device_worker_name());
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::SectionWorker>(
workers_[i]);
if (i == 0) {
// we only set reader for the first section
this_worker->SetDataFeed(reader);
this_worker->SetReaderPlace(place);
}
this_worker->SetThreadIndex(i);
this_worker->SetSectionIndex(i);
this_worker->SetPlace(place);
std::dynamic_pointer_cast<paddle::framework::SectionWorker>(worker_);
this_worker->SetPlace(place_);
this_worker->Initialize(trainer_desc);
this_worker->SetMicrobatchNum(num_microbatches_);
}
// set debug here
SetDebug(trainer_desc.debug());
}
void PipelineTrainer::InitOtherEnv(const ProgramDesc& main_program) {
if (need_dump_field_) {
InitDumpEnv();
}
VLOG(3) << "init other env done.";
}
std::string PipelineTrainer::GetDumpPath(int tid) {
......@@ -119,143 +62,87 @@ void PipelineTrainer::InitDumpEnv() {
}
}
void PipelineTrainer::CopyParameters(int section_id, int microbatch_id,
void PipelineTrainer::CopyParameters(int microbatch_id,
const ProgramDesc& program,
const platform::Place& place) {
auto& global_block = program.Block(0);
std::map<std::string, int> param_map;
for (auto& var : global_block.AllVars()) {
int is_feed_var =
std::count(feed_var_names_.begin(), feed_var_names_.end(), var->Name());
if ((var->Persistable() || is_feed_var) && microbatch_id == 0) {
if (is_feed_var) {
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name());
VLOG(3) << "data name: " << var->Name() << ", ptr: " << new_ptr;
InitializeVariable(new_ptr, var->GetType());
} else {
auto* ptr = root_scope_->FindVar(var->Name());
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name());
VLOG(3) << "Create persistable var " << var->Name() << " for minibatch "
<< section_id << ", which pointer is " << new_ptr;
InitializeVariable(new_ptr, var->GetType());
const LoDTensor& root_tensor = ptr->Get<LoDTensor>();
LoDTensor* minibatch_tensor = new_ptr->GetMutable<LoDTensor>();
TensorCopy(*static_cast<const Tensor*>(&root_tensor), place,
static_cast<Tensor*>(minibatch_tensor));
}
} else if (!var->Persistable() && !is_feed_var) {
auto* ptr =
microbatch_scopes_[section_id][microbatch_id]->Var(var->Name());
VLOG(3) << "Create variable " << var->Name() << " for section "
<< section_id << " microbatch " << microbatch_id
<< ", which pointer is " << ptr;
InitializeVariable(ptr, var->GetType());
if (var->Persistable()) {
param_map[var->Name()] = 1;
}
}
}
void PipelineTrainer::GetSkipVars(int section_id, const ProgramDesc& program) {
auto& global_block = program.Block(0);
for (auto& op : global_block.AllOps()) {
if (op->Type() != "enqueue") {
continue;
for (auto& var : global_block.AllVars()) {
bool is_param_grad = false;
size_t pos = 0;
if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) {
auto prefix_name = var->Name().substr(0, pos);
if (param_map.find(prefix_name) != param_map.end()) {
is_param_grad = true;
}
auto input_arg_names = op->InputArgumentNames();
PADDLE_ENFORCE_EQ(input_arg_names.size(), 1,
platform::errors::InvalidArgument(
"Number of input arguments for enqueue op must be 1, "
"but the value is %d.",
input_arg_names.size()));
std::string input_arg_name = input_arg_names[0];
if (input_arg_name.rfind("@GRAD") != input_arg_name.size() - 5) {
skip_vars_[section_id].emplace_back(input_arg_name);
VLOG(3) << "add skip var name: " << input_arg_name;
}
if (var->Persistable() && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(3) << "Create persistable var: " << var->Name()
<< ", which pointer is " << ptr;
} else if (is_param_grad && microbatch_id == 0) {
auto* ptr = minibatch_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(3) << "Create grad for persistable var: " << var->Name()
<< ", which pointer is " << ptr;
} else if (!var->Persistable() && !is_param_grad) {
auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name());
VLOG(3) << "Create variable " << var->Name() << " for microbatch "
<< microbatch_id << ", which pointer is " << ptr;
InitializeVariable(ptr, var->GetType());
}
}
}
void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
PADDLE_ENFORCE_NOT_NULL(root_scope_,
platform::errors::InvalidArgument(
"root_scope pointer can not be nullptr"));
auto start_cpu_id = trainer_desc_.section_param().start_cpu_core_id();
SectionWorker::cpu_id_.store(start_cpu_id);
minibatch_scopes_.resize(section_num_);
microbatch_scopes_.resize(section_num_);
skip_vars_.resize(section_num_);
PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument(
"root_scope_ can not be nullptr"));
microbatch_scopes_.resize(num_microbatches_);
VLOG(3) << "Init ScopeQueues and create all scopes";
for (int i = 0; i < section_num_; ++i) {
minibatch_scopes_[i] = &root_scope_->NewScope();
VLOG(3) << "Create minibatch and microbatch scopes...";
minibatch_scope_ = &root_scope_->NewScope();
std::shared_ptr<framework::ProgramDesc> program;
program.reset(new ProgramDesc(
trainer_desc_.section_param().section_config(i).program_desc()));
microbatch_scopes_[i].resize(num_microbatches_);
trainer_desc_.section_param().section_config().program_desc()));
for (int j = 0; j < num_microbatches_; ++j) {
microbatch_scopes_[i][j] = &minibatch_scopes_[i]->NewScope();
CopyParameters(i, j, *program, places_[i]);
}
GetSkipVars(i, *program);
microbatch_scopes_[j] = &minibatch_scope_->NewScope();
CopyParameters(j, *program, place_);
}
for (int i = 0; i < section_num_; ++i) {
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::SectionWorker>(
workers_[i]);
std::dynamic_pointer_cast<paddle::framework::SectionWorker>(worker_);
this_worker->SetRootScope(root_scope_);
this_worker->SetMinibatchScope(minibatch_scopes_[i]);
this_worker->SetMicrobatchScopes(microbatch_scopes_[i]);
this_worker->SetSkipVars(skip_vars_[i]);
}
this_worker->SetMinibatchScope(minibatch_scope_);
this_worker->SetMicrobatchScopes(microbatch_scopes_);
}
void PipelineTrainer::Run() {
VLOG(3) << "Going to run";
for (int i = 0; i < section_num_; ++i) {
if (!debug_) {
section_threads_.push_back(
std::thread(&DeviceWorker::TrainFiles, workers_[i].get()));
} else {
section_threads_.push_back(std::thread(
&DeviceWorker::TrainFilesWithProfiler, workers_[i].get()));
}
}
VLOG(5) << "Going to run PipelineTrainer::Run()";
section_thread_ = std::async(&DeviceWorker::TrainFiles, worker_.get());
}
void PipelineTrainer::Finalize() {
for (auto& th : section_threads_) {
th.join();
try {
section_thread_.get();
} catch (platform::EOFException& e) {
std::rethrow_exception(std::current_exception());
}
if (need_dump_field_) {
FinalizeDumpEnv();
}
VLOG(3) << "copying back parameters. ";
for (int i = 0; i < section_num_; ++i) {
std::shared_ptr<framework::ProgramDesc> program;
program.reset(new ProgramDesc(
trainer_desc_.section_param().section_config(i).program_desc()));
for (int j = 0; j < num_microbatches_; ++j) {
auto& global_block = program->Block(0);
for (auto& var : global_block.AllVars()) {
if (var->Persistable()) {
auto* ptr = root_scope_->FindVar(var->Name());
LoDTensor* root_tensor = ptr->GetMutable<LoDTensor>();
auto* minibatch_ptr = minibatch_scopes_[i]->Var(var->Name());
const LoDTensor& minibatch_tensor = minibatch_ptr->Get<LoDTensor>();
TensorCopy(*static_cast<const Tensor*>(&minibatch_tensor), places_[0],
static_cast<Tensor*>(root_tensor));
VLOG(4) << "Copy persitable var " << var->Name() << " to root scope";
}
}
}
}
root_scope_->DropKids();
SectionWorker::ResetBatchId();
SectionWorker::ResetThreadCompletedFlag();
}
Scope* PipelineTrainer::GetWorkerScope(int thread_id) {
return microbatch_scopes_[thread_id][0];
return microbatch_scopes_[0];
}
} // end namespace framework
......
......@@ -30,295 +30,40 @@ limitations under the License. */
namespace paddle {
namespace framework {
std::atomic<int> SectionWorker::cpu_id_(0);
std::mutex SectionWorker::thread_mutex;
std::condition_variable SectionWorker::thread_condition;
bool SectionWorker::threads_completed = false;
uint64_t SectionWorker::batch_id_(0);
void SectionWorker::Initialize(const TrainerDesc& desc) {
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
program_.reset(new ProgramDesc(
desc.section_param().section_config(section_id_).program_desc()));
program_.reset(
new ProgramDesc(desc.section_param().section_config().program_desc()));
for (auto& op_desc : program_->Block(0).AllOps()) {
ops_.push_back(OpRegistry::CreateOp(*op_desc));
}
}
void SectionWorker::AutoSetCPUAffinity(bool reuse) {
int thread_cpu_id = cpu_id_.fetch_add(1);
unsigned concurrency_cap = std::thread::hardware_concurrency();
unsigned proc = thread_cpu_id;
if (proc >= concurrency_cap) {
if (reuse) {
proc %= concurrency_cap;
} else {
LOG(INFO) << "All " << concurrency_cap
<< " CPUs have been set affinities. Fail to set "
<< thread_cpu_id << "th thread";
return;
}
}
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(proc, &mask);
if (-1 == sched_setaffinity(0, sizeof(mask), &mask)) {
LOG(WARNING) << "Fail to set thread affinity to CPU " << proc;
return;
}
CPU_ZERO(&mask);
if ((0 != sched_getaffinity(0, sizeof(mask), &mask)) ||
(0 == CPU_ISSET(proc, &mask))) {
LOG(WARNING) << "Fail to set thread affinity to CPU " << proc;
}
VLOG(3) << "Set " << thread_cpu_id << "th thread affinity to CPU " << proc;
}
void SectionWorker::TrainFiles() {
VLOG(3) << "begin section_worker TrainFiles";
AutoSetCPUAffinity(true);
VLOG(5) << "begin section_worker TrainFiles";
int64_t max_memory_size = 0;
int64_t max_memory_size = GetEagerDeletionThreshold();
std::unique_ptr<GarbageCollector> gc;
auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_);
if (max_memory_size >= 0) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
if (IsFastEagerDeletionModeEnabled()) {
gc.reset(new UnsafeFastGPUGarbageCollector(
BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size));
} else {
gc.reset(new DefaultStreamGarbageCollector(
BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size));
}
} else if (platform::is_cpu_place(place_)) {
#endif
gc.reset(new CPUGarbageCollector(
BOOST_GET_CONST(platform::CPUPlace, place_), max_memory_size));
#ifdef PADDLE_WITH_CUDA
}
#endif
if (thread_id_ == 0) {
while (true) {
// Start a minibatch.
for (int i = 0; i < num_microbatches_; ++i) {
try {
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
bool run_first_mbatch =
op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss)) ||
op_role == static_cast<int>(OpRole::kLRSched);
bool run_others = op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss));
if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
}
}
} catch (platform::EOFException&) {
std::unique_lock<std::mutex> lk(thread_mutex);
threads_completed = true;
VLOG(3) << "thread " << thread_id_ << " completed.";
VLOG(3) << "called notify all";
thread_condition.notify_all();
VLOG(0) << "EOF encountered";
return;
}
if (i == 0) {
VLOG(3) << "called notify all";
std::unique_lock<std::mutex> lk(thread_mutex);
batch_id_ += 1;
thread_condition.notify_all();
}
}
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
static_cast<int>(OpRole::kLoss))) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
}
}
}
// update pass
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
}
}
}
dev_ctx_->Wait();
}
} else {
while (true) {
{
PADDLE_ENFORCE_LE(
local_batch_id_, batch_id_,
platform::errors::InvalidArgument(
"local_batch_id_ (%d) must be less than or equal to "
"batch_id_ (%d)",
local_batch_id_, batch_id_));
std::unique_lock<std::mutex> lk(thread_mutex);
if (local_batch_id_ == batch_id_ && !threads_completed) {
thread_condition.wait(lk);
}
VLOG(3) << "thread " << thread_id_ << " local_batch_id_ "
<< local_batch_id_ << " batch_id_ " << batch_id_;
if (threads_completed) {
VLOG(3) << "thread " << thread_id_ << " completed.";
lk.unlock();
return;
}
lk.unlock();
local_batch_id_ += 1;
}
// forward pass:
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
bool run_first_mbatch =
op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss)) ||
op_role == static_cast<int>(OpRole::kLRSched);
bool run_others = op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss));
if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
}
}
}
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
static_cast<int>(OpRole::kLoss))) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
}
}
}
// update pass
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
}
}
}
dev_ctx_->Wait();
}
}
}
void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "begin section_worker TrainFiles with profiler";
AutoSetCPUAffinity(true);
platform::Timer batch_timer;
platform::Timer timeline;
std::vector<double> op_total_time;
std::vector<std::string> op_name;
std::vector<double> op_max_time;
std::vector<double> op_min_time;
std::vector<uint64_t> op_count;
for (auto& op : ops_) {
op_name.push_back(op->Type());
}
op_total_time.resize(ops_.size());
op_max_time.resize(ops_.size());
op_min_time.resize(ops_.size());
for (size_t i = 0; i < op_min_time.size(); ++i) {
op_min_time[i] = DBL_MAX;
}
op_count.resize(ops_.size());
int64_t max_memory_size = 0;
std::unique_ptr<GarbageCollector> gc;
// const std::vector<std::string> keep_vars;
auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_);
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
if (IsFastEagerDeletionModeEnabled()) {
gc.reset(new UnsafeFastGPUGarbageCollector(
BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size));
} else {
gc.reset(new DefaultStreamGarbageCollector(
BOOST_GET_CONST(platform::CUDAPlace, place_), max_memory_size));
}
} else if (platform::is_cpu_place(place_)) {
#endif
gc.reset(new CPUGarbageCollector(
BOOST_GET_CONST(platform::CPUPlace, place_), max_memory_size));
#ifdef PADDLE_WITH_CUDA
}
#endif
if (thread_id_ == 0) {
while (true) {
// Start a minibatch.
// int batch_size = 0;
batch_timer.Start();
for (int i = 0; i < num_microbatches_; ++i) {
try {
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
bool run_first_mbatch =
op_role == static_cast<int>(OpRole::kForward) ||
bool run_first_mbatch = op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss)) ||
op_role == static_cast<int>(OpRole::kLRSched);
......@@ -326,244 +71,53 @@ void SectionWorker::TrainFilesWithProfiler() {
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss));
if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
} catch (platform::EOFException&) {
std::unique_lock<std::mutex> lk(thread_mutex);
threads_completed = true;
VLOG(3) << "thread " << thread_id_ << " completed.";
VLOG(3) << "called notify all";
thread_condition.notify_all();
VLOG(0) << "EOF encountered";
VLOG(0) << "============timeline============";
for (size_t i = 0; i < ops_.size(); ++i) {
VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i]
<< ", min_time: " << op_min_time[i]
<< ", mean_time: " << op_total_time[i] / op_count[i];
}
VLOG(0) << "================================";
return;
}
if (i == 0) {
VLOG(3) << "called notify all";
std::unique_lock<std::mutex> lk(thread_mutex);
batch_id_ += 1;
thread_condition.notify_all();
}
}
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
static_cast<int>(OpRole::kLoss))) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
VLOG(3) << "Forward: running op " << op->Type() << " for micro-batch "
<< i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_,
gc.get());
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
}
// update pass
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
timeline.Start();
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
dev_ctx_->Wait();
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
}
} else {
while (true) {
{
PADDLE_ENFORCE_LE(
local_batch_id_, batch_id_,
platform::errors::InvalidArgument(
"local_batch_id_ (%d) must be less than or equal to "
"batch_id_ (%d)",
local_batch_id_, batch_id_));
std::unique_lock<std::mutex> lk(thread_mutex);
if (local_batch_id_ == batch_id_ && !threads_completed) {
thread_condition.wait(lk);
}
VLOG(3) << "thread " << thread_id_ << " local_batch_id_ "
<< local_batch_id_ << " batch_id_ " << batch_id_;
if (threads_completed) {
VLOG(3) << "thread " << thread_id_ << " completed.";
lk.unlock();
VLOG(0) << "============timeline============";
for (size_t i = 0; i < ops_.size(); ++i) {
VLOG(0) << "op: " << op_name[i] << ", max_time: " << op_max_time[i]
<< ", min_time: " << op_min_time[i]
<< ", mean_time: " << op_total_time[i] / op_count[i];
}
VLOG(0) << "================================";
return;
}
lk.unlock();
local_batch_id_ += 1;
}
// forward pass:
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
bool run_first_mbatch =
op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss)) ||
op_role == static_cast<int>(OpRole::kLRSched);
bool run_others = op_role == static_cast<int>(OpRole::kForward) ||
op_role == (static_cast<int>(OpRole::kForward) |
static_cast<int>(OpRole::kLoss));
if ((i == 0 && run_first_mbatch) || (i != 0 && run_others)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
cudaDeviceSynchronize();
}
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
static_cast<int>(OpRole::kLoss))) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
VLOG(3) << "Backward: running op " << op->Type() << " for micro-batch "
<< i;
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_,
gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
cudaDeviceSynchronize();
}
// update pass
int op_idx = 0;
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
timeline.Start();
VLOG(3) << "Update: running op " << op->Type();
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
DeleteUnusedTensors(*microbatch_scopes_[0], op.get(), unused_vars_,
gc.get());
}
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
op_max_time[op_idx] = time;
}
if (time < op_min_time[op_idx]) {
op_min_time[op_idx] = time;
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
}
op_idx++;
}
dev_ctx_->Wait();
}
}
++batch_id_;
}
} // namespace framework
} // namespace paddle
#endif
......@@ -290,29 +290,22 @@ class PipelineTrainer : public TrainerBase {
virtual Scope* GetWorkerScope(int thread_id);
void InitDumpEnv() override;
virtual std::string GetDumpPath(int tid);
void GetSkipVars(int section_id, const ProgramDesc& main_program);
void GetSkipVars(const ProgramDesc& main_program);
protected:
int section_num_;
int num_microbatches_;
int start_cpu_core_id_;
std::vector<std::string> feed_var_names_;
std::vector<platform::Place> places_;
std::vector<std::vector<std::string>> skip_vars_;
platform::Place place_;
std::vector<std::string> skip_vars_;
TrainerDesc trainer_desc_;
std::vector<std::thread> section_threads_;
// worker: [section_id]
std::vector<std::shared_ptr<paddle::framework::DeviceWorker>> workers_;
// minibatch_scopes_: [section_id]
std::vector<Scope*> minibatch_scopes_;
// microbatch_scopes_: [section_id][microbatch_id]
std::vector<std::vector<Scope*>> microbatch_scopes_;
std::future<void> section_thread_;
std::shared_ptr<paddle::framework::DeviceWorker> worker_;
Scope* minibatch_scope_;
// microbatch_scopes_: [microbatch_id]
std::vector<Scope*> microbatch_scopes_;
void CopyParameters(int section_id, int microbatch_id,
const ProgramDesc& program, const platform::Place& place);
bool isPersistableVarGrad(std::string name);
bool isPersistable(VarDesc* var);
void CopyParameters(int microbatch_id, const ProgramDesc& program,
const platform::Place& place);
};
#endif
......
......@@ -86,7 +86,7 @@ message DownpourWorkerParameter {
}
message SectionWorkerParameter {
repeated SectionConfig section_config = 1;
optional SectionConfig section_config = 1;
optional int32 queue_size = 2 [ default = 1 ];
optional int64 sync_steps = 3 [ default = 1 ];
optional int32 start_cpu_core_id = 4 [ default = 1 ];
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
from __future__ import print_function
from __future__ import division
import paddle.fluid as fluid
from paddle.fluid import core, unique_name
......@@ -21,9 +22,55 @@ from .meta_optimizer_base import MetaOptimizerBase
from .common import OpRole, OP_ROLE_KEY, OP_ROLE_VAR_KEY, CollectiveHelper, is_update_op, is_loss_grad_op, is_backward_op, is_optimizer_op
class PipelineHelper(CollectiveHelper):
def __init__(self, role_maker, nrings=1, wait_port='6174'):
super(PipelineHelper, self).__init__(role_maker, nrings, wait_port)
def _get_node_num(endpoints):
ss = set()
for ep in endpoints:
ip = ep.split(":")[0].strip()
if ip not in ss:
ss.add(ip)
return len(ss)
class PipelineHelper(object):
def __init__(self, role_maker, wait_port='6174'):
self.wait_port = wait_port
self.role_maker = role_maker
def update_startup_program(self,
startup_program=None,
inner_parallelism=None):
self.startup_program = startup_program
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()]
node_num = _get_node_num(endpoints)
assert len(endpoints) % node_num == 0
nranks = self.role_maker._worker_num()
rank = self.role_maker._worker_index()
# Create ring 0 for all gpus in a pipeline
pipeline_endpoints = []
pipeline_rank = rank % inner_parallelism
pipeline_id = rank // inner_parallelism
for idx, ep in enumerate(endpoints):
if idx // inner_parallelism == pipeline_id:
pipeline_endpoints.append(ep)
self._init_communicator(self.startup_program, current_endpoint,
pipeline_endpoints, pipeline_rank, 0,
self.wait_port)
pipeline_num = len(endpoints) // inner_parallelism
if pipeline_num == 1: return
# Create rings for gpus with the same gpu id
eps = []
local_rank = self.role_maker._worker_index() % inner_parallelism
ring_id = local_rank + 1
for i in range(pipeline_num):
eps.append(endpoints[i * inner_parallelism + local_rank])
temp_rank = self.role_maker._worker_index() // inner_parallelism
self._init_communicator(self.startup_program, current_endpoint, eps,
temp_rank, ring_id, self.wait_port)
self._broadcast_params(ring_id)
def _init_communicator(self, program, current_endpoint, endpoints, rank,
ring_id, wait_port):
......@@ -46,9 +93,8 @@ class PipelineHelper(CollectiveHelper):
'rank': rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints,
OP_ROLE_KEY: OpRole.Forward
OP_ROLE_KEY: OpRole.Forward,
})
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
......@@ -58,12 +104,10 @@ class PipelineHelper(CollectiveHelper):
'rank': rank,
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Forward,
'device_id': OpRole.Forward
})
def _broadcast_params(self):
def _broadcast_params(self, ring_id):
block = self.startup_program.global_block()
ring_id = 0
for param in block.iter_parameters():
if param.is_distributed:
continue
......@@ -78,7 +122,6 @@ class PipelineHelper(CollectiveHelper):
OP_ROLE_KEY: OpRole.Forward
})
for ring_id in range(self.nrings):
block.append_op(
type='c_sync_comm_stream',
inputs={'X': param},
......@@ -99,8 +142,8 @@ class PipelineOptimizer(MetaOptimizerBase):
user_defined_strategy):
super(PipelineOptimizer, self)._set_basic_info(
loss, role_maker, user_defined_optimizer, user_defined_strategy)
num_microbatches = user_defined_strategy.pipeline_configs['micro_batch']
self.wrapped_opt = PO(self.inner_opt, num_microbatches=num_microbatches)
self.num_microbatches = user_defined_strategy.pipeline_configs[
'micro_batch']
def _can_apply(self):
if not self.role_maker._is_collective:
......@@ -115,29 +158,46 @@ class PipelineOptimizer(MetaOptimizerBase):
dist_strategy.pipeline_configs = {}
def _enable_strategy(self, dist_strategy, context):
# we do not support enable pipeline automatically right now
return
dist_strategy.pipeline = True
dist_strategy.pipeline_configs = {"micro_batch": 1, }
def _get_local_rank(self, current_endpoint, endpoints):
cur_node_endpoints = []
cur_ip = current_endpoint.split(':')[0].strip()
for ep in endpoints:
if cur_ip == ep.split(':')[0].strip():
cur_node_endpoints.append(ep)
return cur_node_endpoints.index(current_endpoint)
def minimize_impl(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
optimize_ops, params_grads, prog_list = \
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
if self.role_maker._worker_num() == 1:
return optimize_ops, params_grads
endpoints = self.role_maker._get_trainer_endpoints()
current_endpoint = endpoints[self.role_maker._worker_index()]
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
self.wrapped_opt = PO(self.inner_opt,
num_microbatches=self.num_microbatches,
start_cpu_core_id=self.local_rank)
node_num = _get_node_num(endpoints)
gpus_per_node = len(endpoints) // node_num
self.startup_program = startup_program
self.local_rank = self._get_local_rank(current_endpoint, endpoints)
if startup_program is None:
self.startup_program = fluid.default_startup_program()
loss.block.program._pipeline_opt = dict()
loss.block.program._pipeline_opt['local_rank'] = self.local_rank
optimize_ops, params_grads, prog_list = \
self.wrapped_opt.minimize(loss, startup_program,
parameter_list, no_grad_set)
assert prog_list
self.main_program_list = prog_list
self.main_program = loss.block.program
self.inner_parallelism = loss.block.program._pipeline_opt[
'inner_parallelism']
nranks = len(endpoints)
self.nranks = nranks
self.nrings = len(self.main_program_list)
......@@ -146,24 +206,26 @@ class PipelineOptimizer(MetaOptimizerBase):
self.endpoints = endpoints
self.current_endpoint = current_endpoint
pipeline_helper = PipelineHelper(self.role_maker, nrings=self.nrings)
pipeline_helper.update_startup_program(self.startup_program)
pipeline_helper = PipelineHelper(self.role_maker)
pipeline_helper.update_startup_program(
self.startup_program._pipeline_opt["startup_program"],
self.inner_parallelism)
self._transpile_main_program()
self._transpile_main_program(loss, node_num, gpus_per_node)
return optimize_ops, params_grads
def _transpile_main_program(self):
self._insert_loss_grad_ops()
for ring_id in range(self.nrings):
def _transpile_main_program(self, loss, node_num, gpus_per_node):
self._insert_loss_grad_ops(loss, gpus_per_node, node_num)
for ring_id in range(1, gpus_per_node + 1):
self._insert_allreduce_ops(ring_id)
def _insert_loss_grad_ops(self):
def _insert_loss_grad_ops(self, loss, gpus_per_node, node_num):
"""
In order to keep the learning rate consistent in different numbers of
training workers, we scale the loss grad by the number of workers
"""
block = self.main_program_list[self.nrings - 1]['program'].global_block(
)
block = self.main_program_list[gpus_per_node - 1][
'program'].global_block()
for idx, op in reversed(list(enumerate(block.ops))):
if is_loss_grad_op(op):
loss_grad_var = block.vars[op.output_arg_names[0]]
......@@ -173,12 +235,12 @@ class PipelineOptimizer(MetaOptimizerBase):
inputs={'X': loss_grad_var},
outputs={'Out': loss_grad_var},
attrs={
'scale': 1.0 / self.nranks,
'scale': 1.0 / node_num,
OP_ROLE_KEY: OpRole.Backward
})
def _insert_allreduce_ops(self, ring_id):
block = self.main_program_list[ring_id]['program'].global_block()
block = self.main_program_list[ring_id - 1]['program'].global_block()
origin_block = self.main_program.global_block()
grad = None
for idx, op in reversed(list(enumerate(block.ops))):
......
......@@ -413,24 +413,16 @@ class Section(DeviceWorker):
section_param = trainer_desc.section_param
section_param.num_microbatches = pipeline_opt["num_microbatches"]
section_param.start_cpu_core_id = pipeline_opt["start_cpu_core_id"]
for i, program in enumerate(pipeline_opt["section_program_list"]):
cfg = section_param.section_config.add()
cfg = section_param.section_config
program = pipeline_opt["section_program"]
cfg.program_desc.ParseFromString(program["program"]._get_desc()
.serialize_to_string())
# TODO: why does not work
# cfg.program_desc.CopyFrom(program.program._get_desc())
place = pipeline_opt["place_list"][i]
place_id = pipeline_opt["place_id_list"][i]
if isinstance(place, core.CPUPlace):
cfg.place = cfg.CPUPlace
elif isinstance(place, core.CUDAPlace):
place = pipeline_opt["place"]
place_id = pipeline_opt["place_id"]
assert isinstance(place, core.CUDAPlace)
cfg.place = cfg.CUDAPlace
elif isinstance(place, core.CUDAPinnedPlace):
cfg.place = cfg.CUDAPinnedPlace
else:
raise NotImplementedError(
"SectionWorker only supports CPUPlace, CUDAPlace and CUDAPinnedPlace now."
)
cfg.place_id = place_id
......
......@@ -561,6 +561,7 @@ class Executor(object):
self._default_executor = core.Executor(p)
self._closed = False
self.pruned_program_scope_caches = dict()
self._prepare_to_run_called = False
self._auto_checkpoint_name = unique_name.generate(
"__auto_checkpoint_executor__")
......@@ -1115,6 +1116,24 @@ class Executor(object):
use_default_main_program = program is None
if program is None:
program = default_main_program()
if fetch_list is not None:
if isinstance(fetch_list, Variable) or isinstance(
fetch_list, str) or isinstance(fetch_list,
six.string_types):
fetch_list = [fetch_list]
assert isinstance(fetch_list, tuple) or isinstance(fetch_list, list), \
"Currently , The fetch_list type only should be list or tuple, \n"\
"but the input type is {}. For more information please refer to \n"\
"the executor.run(...).".format(type(fetch_list))
else:
fetch_list = []
if isinstance(program, Program) and program._pipeline_opt:
if "startup_program" in program._pipeline_opt:
program = program._pipeline_opt["startup_program"]
else:
return self.train_from_dataset(program, fetch_list=fetch_list)
if isinstance(program, Program) and \
len(program.global_block().ops) == 0:
if use_default_main_program:
......@@ -1131,18 +1150,6 @@ class Executor(object):
if scope is None:
scope = global_scope()
if fetch_list is not None:
if isinstance(fetch_list, Variable) or isinstance(
fetch_list, str) or isinstance(fetch_list,
six.string_types):
fetch_list = [fetch_list]
assert isinstance(fetch_list, tuple) or isinstance(fetch_list, list), \
"Currently , The fetch_list type only should be list or tuple, \n"\
"but the input type is {}. For more information please refer to \n"\
"the executor.run(...).".format(type(fetch_list))
else:
fetch_list = []
# use_prune can be overrided by putting optimize_ops in fetch_list
_origin_fetch_list = fetch_list
_origin_program = program
......@@ -1449,6 +1456,25 @@ class Executor(object):
raise RuntimeError("dataset is need and should be initialized")
dataset._prepare_to_run()
real_fetch_list = []
if program._pipeline_opt:
real_program = program._pipeline_opt["section_program"]['program']
for fetch_var in fetch_list:
if isinstance(fetch_var, Variable):
fetch_var_name = fetch_var.name
else:
fetch_var_name = fetch_var
if fetch_var_name in real_program.global_block().vars:
real_fetch_list.append(fetch_var)
program._pipeline_opt["section_program"][
'program'] = self._add_feed_fetch_ops(
program=program._pipeline_opt["section_program"]['program'],
feed=[],
fetch_list=real_fetch_list,
feed_var_name='feed',
fetch_var_name='fetch')
fetch_list = None
scope, trainer = self._prepare_trainer(
program=program,
......@@ -1483,6 +1509,10 @@ class Executor(object):
dataset._dynamic_adjust_after_train()
dataset._finish_to_run()
if real_fetch_list:
arr = scope.find_var('fetch').get_fetch_list()
tensors = arr._move_to_list()
return as_numpy(tensors)
return None
......
......@@ -3743,15 +3743,9 @@ class PipelineOptimizer(object):
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
batch_size = 1
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x,y])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
data_loader.start()
exe.train_from_dataset(
fluid.default_main_program(),
dataset)
fluid.default_main_program())
data_loader.reset()
"""
......@@ -3769,7 +3763,7 @@ class PipelineOptimizer(object):
"num_microbatches must be a positive value.")
self._num_microbatches = num_microbatches
assert start_cpu_core_id >= 0, (
"start_cpu_core_id must be greater than or equal to 0.")
"start_cpu_core_id must be a non-negative integer.")
self._start_cpu_core_id = start_cpu_core_id
self._place_list = None
op_maker = core.op_proto_and_checker_maker
......@@ -3777,7 +3771,7 @@ class PipelineOptimizer(object):
self._op_role_key = op_maker.kOpRoleAttrName()
self._op_role_var_key = op_maker.kOpRoleVarAttrName()
self._op_device_key = op_maker.kOpDeviceAttrName()
self._param_device_map = dict()
self._param_device_map = None
def _create_vars(self, block, main_program):
# Create vars for block, copied from main_program's global block
......@@ -3793,7 +3787,10 @@ class PipelineOptimizer(object):
used_var_set.add(var)
source_var = main_program.block(0).var(str(var))
if source_var.type == core.VarDesc.VarType.READER:
block.create_var(name=var, type=core.VarDesc.VarType.READER)
block.create_var(
name=var,
type=core.VarDesc.VarType.READER,
persistable=source_var.persistable)
else:
block._clone_variable(source_var, False)
......@@ -3816,28 +3813,48 @@ class PipelineOptimizer(object):
return 'Param' in op.input_names and 'Grad' in op.input_names and (
"LearningRate" in op.input_names)
def _split_program(self, main_program):
def _split_program(self, main_program, devices):
"""
Split a program into sections according to devices that ops run on.
The ops of the role LRSched are copied to all sections.
Args:
main_program (Program): the main program
devices: all used devices
"""
programs = []
# Map from device to its corresponding section program info
device_program_map = dict()
block = main_program.block(0)
for device in devices:
p = {'program': Program()}
device_program_map[device] = p
block = main_program.block(0)
for op in block.ops:
device = op.attr(self._op_device_key)
if device not in device_program_map:
program = {"program": Program()}
device_program_map[device] = program
op_role = op.attr(self._op_role_key)
if int(op_role) & int(self._op_role.LRSched):
# Copy ops of the role LRSched to all sections.
for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
elif op.type == "create_py_reader" or op.type == "read":
# Copy read related ops to all section to make them exit after each epoch.
for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
else:
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
for key in sorted(device_program_map.keys()):
program = device_program_map[key]
......@@ -3846,6 +3863,24 @@ class PipelineOptimizer(object):
return programs
def _split_startup_program(self, startup_program, local_rank):
block = startup_program.block(0)
new_startup_program = Program()
for op in block.ops:
device = op.attr(self._op_device_key)
if device:
device_index = int(device.split(":")[1])
else:
device_index = None
if device_index is not None and device_index != local_rank: continue
op_desc = op.desc
ap_op = new_startup_program.block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
new_startup_program._sync_with_cpp()
self._create_vars(new_startup_program.block(0), startup_program)
return new_startup_program
def _find_post_op(self, ops, cur_op, var_name):
"""
Find the real post op that has variable named var_name as input.
......@@ -3867,9 +3902,8 @@ class PipelineOptimizer(object):
for in_var_name in op.input_arg_names:
if in_var_name == var_name:
post_op.append(op)
break
if post_op:
if not len(post_op) == 1:
raise ValueError("Each op can only have one post op.")
return post_op[0]
return None
......@@ -3885,6 +3919,8 @@ class PipelineOptimizer(object):
"""
prev_op = []
for op in ops:
if op.type == 'send_v2' or op.type == 'recv_v2':
continue
if op == cur_op:
break
for out_var_name in op.output_arg_names:
......@@ -3923,61 +3959,27 @@ class PipelineOptimizer(object):
def _get_data_var_info(self, block):
"""
Get all vars whose is_data attribute are true and then rename them.
For PipelineTrainer, all data vars are binded to
minibatch scope, so we have to feed them to the microbatch
to avoid conflicts. The vars feeded to microbatch have to
be renamed.
Get info of all vars whose is_data attribute are true.
"""
# A map from var name to the renamed name.
raw_name_new_name_map = dict()
# Because we will create vars in block, it is more safe
# to get all var_names before iteration.
var_names = list(block.vars.keys())
for var_name in var_names:
var = block.var(var_name)
if not var.is_data:
continue
assert var_name not in raw_name_new_name_map, (
"{} has already been processed.".format(var_name))
new_name = unique_name.generate(var_name)
raw_name_new_name_map[var_name] = new_name
new_var = self._create_var(block, var, new_name)
new_var.is_data = False
# map of data to devices that that data on
# map of data vars to devices that that data on
data_devices_map = dict()
for op in block.ops:
dev_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names:
if var_name not in raw_name_new_name_map:
if "blocking_queue" in var_name: continue
var = block.var(var_name)
if not var.is_data:
continue
if not var_name in data_devices_map:
data_devices_map[var_name] = []
if not dev_spec in data_devices_map[var_name]:
data_devices_map[var_name].append(dev_spec)
new_name = raw_name_new_name_map[var_name]
#self._rename_arg(op, var_name, new_name)
return data_devices_map, raw_name_new_name_map
def _rename_var_in_block(self, block, raw_name_new_name_map):
"""
Rename vars whose names in raw_name_new_name_map to the corresponding
new names.
"""
for op in block.ops:
if op.type == "enqueue" or op.type == "dequeue":
continue
for var_name in op.input_arg_names:
if var_name in raw_name_new_name_map:
new_name = raw_name_new_name_map[var_name]
self._rename_arg(op, var_name, new_name)
return data_devices_map
def _insert_enq_deq_for_data_var(self, main_block, programs, startup,
def _insert_sendrecv_for_data_var(self, main_block, programs, startup,
devices):
"""
Insert enqueue and dequeue ops for data var
Insert send and recv ops for data var that on other devices.
Args:
main_block (Block): Global block for main program
......@@ -3986,48 +3988,34 @@ class PipelineOptimizer(object):
devices (list): List of devices in the format (dev:dev_index)
"""
main_program = main_block.program
data_devices_map, raw_name_new_name_map = self._get_data_var_info(
main_block)
data_devices_map = self._get_data_var_info(main_block)
first_prog = programs[0]['program']
first_block = first_prog.block(0)
enqueue_index = 0
if first_block.ops[0].type == "create_py_reader" or (
first_block.ops[1].type == "create_py_reader"):
insert_index = 0
for op in first_block.ops:
insert_index += 1
if op.type == "read":
enqueue_index += 1
break
enqueue_index += 1
first_dev_spec = devices[0]
first_dev_index = int(first_dev_spec.split(':')[1])
for var_name in data_devices_map.keys():
for device in data_devices_map[var_name]:
# step1: generate queue for each pair of data var and device
# that that data on
queue_name = var_name + "_blocking_queue"
queue_name = unique_name.generate(queue_name)
queue_var = startup.block(0).create_var(
name=queue_name,
persistable=True,
type=core.VarDesc.VarType.RAW)
startup.block(0).append_op(
type='queue_generator',
attrs={
'names': [queue_name],
'capacity': self._num_microbatches
})
if device == first_dev_spec: continue
main_var = main_block.var(var_name)
assert main_var.is_data
if not var_name in first_block.vars:
self._create_var(first_block, main_var, var_name)
dev_index = int(device.split(':')[1])
first_block._insert_op(
index=enqueue_index,
type='enqueue',
index=insert_index,
type='send_v2',
inputs={'X': first_block.var(var_name)},
attrs={
'queue_name': queue_name,
self._op_device_key: first_dev_spec,
self._op_role_key: self._op_role.Forward
self._op_role_key: self._op_role.Forward,
'use_calc_stream': True,
'peer': dev_index,
})
# Get the device that that data on
assert device in devices
......@@ -4035,21 +4023,24 @@ class PipelineOptimizer(object):
prog = programs[prog_index]['program']
block = prog.block(0)
index = 0
if device == first_dev_spec:
index = enqueue_index + 1
new_name = raw_name_new_name_map[var_name]
for op in block.ops:
index += 1
if op.type == "read":
break
source_var = main_program.block(0).var(var_name)
new_var = self._create_var(block, source_var, new_name)
new_var = self._create_var(block, source_var, var_name)
block._insert_op(
index=index,
type='dequeue',
type='recv_v2',
outputs={'Out': [new_var]},
attrs={
'out_shape': new_var.shape,
'dtype': new_var.dtype,
self._op_device_key: device,
self._op_role_key: self._op_role.Forward,
'queue_name': queue_name,
'peer': first_dev_index,
'use_calc_stream': True,
})
self._rename_var_in_block(block, raw_name_new_name_map)
def _strip_grad_suffix(self, name):
"""
......@@ -4064,18 +4055,6 @@ class PipelineOptimizer(object):
"""
return name + core.grad_var_suffix()
def _update_param_device_map(self, params_grads, block):
for param_grad in params_grads:
if not param_grad[0].trainable: continue
param_name = param_grad[0].name
ops = block.ops
for op in ops:
input_arg_names = op.input_arg_names
if param_name in input_arg_names:
self._param_device_map[param_name] = op.attr(
self._op_device_key)
break
def _add_opdevice_attr_for_regularization_clip(self, block):
"""
Add op_device attribute for regulization and clip ops.
......@@ -4090,7 +4069,7 @@ class PipelineOptimizer(object):
assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2
param_name = block.vars[op_role_var[0]].name
param_name = op_role_var[0]
device = self._param_device_map[param_name]
op._set_attr(self._op_device_key, device)
......@@ -4159,32 +4138,37 @@ class PipelineOptimizer(object):
"{} has not been set.".format(op.type))
if not dev_spec in device_specs:
device_specs.append(dev_spec)
sorted_device_specs = sorted(device_specs)
assert sorted_device_specs == device_specs
return device_specs
def _insert_enq_deq_ops_for_boundaries(self, block, origin_block,
startup_program):
def _insert_sendrecv_ops_for_boundaries(self, block):
"""
Insert a pair of enqueue and dequeue ops for every two
Insert a pair of send and recv ops for every two
consecutive ops on different devices.
"""
startup_block = startup_program.global_block()
extra_index = 0
# A map from var to device spec where op takes it as input,
# avoiding multiple enqueue and dequeue ops.
# avoiding multiple send and recv ops.
var_devspec = dict()
for index, op in list(enumerate(origin_block.ops)):
for index, op in enumerate(list(block.ops)):
# skips lr-related ops and vars, as we will process them later.
if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched):
continue
# skips update ops and vars, as we will process them later.
if self._is_update_op(op): continue
cur_device_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names:
# i.e., lod_tensor_blocking_queue created by DataLoader,
# which only exists in startup program.
if not var_name in origin_block.vars: continue
if not var_name in block.vars: continue
var = block.var(var_name)
# skip data, because we will process it later
if var.is_data: continue
prev_op = self._find_real_prev_op(origin_block.ops, op,
var_name)
prev_op = self._find_real_prev_op(block.ops, op, var_name)
if prev_op is None:
continue
prev_device_spec = prev_op.attr(self._op_device_key)
......@@ -4195,118 +4179,64 @@ class PipelineOptimizer(object):
if cur_device_spec in var_devspec[var_name]: continue
var_devspec[var_name].append(cur_device_spec)
queue_name = var_name + "_blocking_queue"
queue_name = unique_name.generate(queue_name)
queue_var = startup_block.create_var(
name=queue_name,
persistable=True,
type=core.VarDesc.VarType.RAW)
startup_block.append_op(
type='queue_generator',
attrs={
'names': [queue_name],
'capacity': self._num_microbatches
})
op_role = op.all_attrs()[self._op_role_key]
var = block.vars[var_name]
prev_device_index = int(prev_device_spec.split(':')[1])
cur_device_index = int(cur_device_spec.split(':')[1])
block._insert_op(
index=index + extra_index,
type='enqueue',
type='send_v2',
inputs={'X': var},
attrs={
'queue_name': queue_name,
self._op_device_key: prev_device_spec,
self._op_role_key: op_role
self._op_role_key: op_role,
'use_calc_stream': True,
'peer': cur_device_index,
})
extra_index += 1
block._insert_op(
index=index + extra_index,
type='dequeue',
type='recv_v2',
outputs={'Out': [var]},
attrs={
'out_shape': var.shape,
'dtype': var.dtype,
self._op_device_key: cur_device_spec,
'queue_name': queue_name,
self._op_role_key: op_role
self._op_role_key: op_role,
'use_calc_stream': True,
'peer': prev_device_index,
})
extra_index += 1
def _add_dequeue_ops_for_optimize(self, block, startup_program):
startup_block = startup_program.global_block()
grad_queue_map = dict()
grad_device_map = dict()
optimize_index = None
grad_names_to_dequeue = []
for index, op in reversed(list(enumerate(block.ops))):
device = op.attr(self._op_device_key)
# Optimizer pass
if not self._is_optimize_op(op):
optimize_index = index + 1
break
if not self._is_update_op(op): continue
assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2
grad_name = op_role_var[1]
assert grad_name not in grad_device_map
assert grad_name not in grad_names_to_dequeue
grad_device_map[grad_name] = device
grad_names_to_dequeue.append(grad_name)
for grad_name in grad_names_to_dequeue:
device = grad_device_map[grad_name]
grad_names = []
grads = []
queue_name = grad_name + "_blocking_queue"
queue_name = unique_name.generate(queue_name)
grad_queue_map[grad_name] = queue_name
ref_var = block.vars[grad_name]
queue_var = startup_block.create_var(
name=queue_name,
persistable=True,
type=core.VarDesc.VarType.RAW)
startup_block.append_op(
type='queue_generator',
attrs={
'names': [queue_name],
'capacity': self._num_microbatches
})
orig_var_name = self._strip_grad_suffix(grad_name)
for _ in range(self._num_microbatches):
u_name = unique_name.generate(orig_var_name)
u_grad_name = self._append_grad_suffix(u_name)
grad_var = self._create_var(block, ref_var, u_grad_name)
grad_names.append(u_grad_name)
grads.append(grad_var)
block._insert_op(
index=optimize_index,
type='dequeue',
outputs={'Out': grads},
attrs={
self._op_device_key: device,
'queue_name': queue_name,
self._op_role_key: self._op_role.Optimize
})
block._insert_op(
index=optimize_index + 1,
type='sum',
inputs={'X': grad_names},
outputs={'Out': ref_var},
def _clear_gradients(self, main_block, dev_spec):
"""
Clear gradients at the begining of each run of a minibatch.
"""
for param_name in self._param_device_map:
device = self._param_device_map[param_name]
if device != dev_spec: continue
grad_name = self._append_grad_suffix(param_name)
grad_var = main_block.vars[grad_name]
main_block._insert_op(
index=0,
type='fill_constant',
inputs={},
outputs={'Out': [grad_var]},
attrs={
'shape': grad_var.shape,
'dtype': grad_var.dtype,
'value': float(0),
self._op_device_key: device,
self._op_role_key: self._op_role.Optimize
# a trick to run this op once per mini-batch
self._op_role_key: self._op_role.Optimize.LRSched,
})
return grad_queue_map
def _insert_enq_deq_ops_for_update(self, block, startup_program):
def _accumulate_gradients(self, block):
"""
Insert enqueue and dequeue ops for gradients of parameters.
Accumulate the gradients generated in microbatch to the one in mini-batch.
We also scale the loss corresponding to number of micro-batches as well.
"""
startup_block = startup_program.global_block()
grad_queue_map = self._add_dequeue_ops_for_optimize(block,
startup_program)
for index, op in reversed(list(enumerate(block.ops))):
for index, op in reversed(tuple(enumerate(list(block.ops)))):
offset = index
device = op.attr(self._op_device_key)
......@@ -4332,19 +4262,23 @@ class PipelineOptimizer(object):
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
offset = index
for i in range(0, len(op_role_var), 2):
grad_name = op_role_var[i + 1]
grad_var = block.vars[grad_name]
assert grad_name in grad_queue_map
queue_name = grad_queue_map[grad_name]
new_grad_var_name = unique_name.generate(grad_name)
new_var = self._create_var(block, grad_var,
new_grad_var_name)
self._rename_arg(op, grad_name, new_grad_var_name)
block._insert_op(
index=offset + 1,
type='enqueue',
inputs={'X': block.vars[grad_name]},
type='sum',
inputs={'X': [grad_var, new_var]},
outputs={'Out': grad_var},
attrs={
'queue_name': queue_name,
self._op_device_key: device,
self._op_role_key: self._op_role.Backward
self._op_role_key: self._op_role.Backward,
self._op_role_var_key: op_role_var
})
offset += 1
......@@ -4401,7 +4335,9 @@ class PipelineOptimizer(object):
for prog in var_info[var_name]:
block = prog.block(0)
for op in block.ops:
if op.type == "dequeue": continue
if op.type == "recv_v2" or op.type == "create_py_reader" or \
op.type == "read":
continue
# We have processed lr related vars
if op.attr(self._op_role_key) == int(
self._op_role.Optimize.LRSched):
......@@ -4421,45 +4357,39 @@ class PipelineOptimizer(object):
write_prog = write_info[var_name]
write_block = write_prog.block(0)
write_device = self._get_device_info(write_block)
write_dev_index = int(write_device.split(':')[1])
all_progs = var_info[var_name]
for prog in all_progs:
if prog == write_prog: continue
read_block = prog.block(0)
read_device = self._get_device_info(read_block)
read_dev_index = int(read_device.split(':')[1])
queue_name = var_name + "_blocking_queue"
queue_name = unique_name.generate(queue_name)
queue_var = startup_prog.block(0).create_var(
name=queue_name,
persistable=True,
type=core.VarDesc.VarType.RAW)
startup_prog.block(0).append_op(
type='queue_generator',
attrs={
'names': [queue_name],
'capacity': self._num_microbatches
})
write_block._insert_op(
index=0,
type='enqueue',
type='send_v2',
inputs={'X': write_block.var(var_name), },
attrs={
'queue_name': queue_name,
self._op_device_key: write_device,
'use_calc_stream': True,
# A trick to make the role LRSched to avoid copy every
# microbatch
self._op_role_key: self._op_role.LRSched
self._op_role_key: self._op_role.LRSched,
'peer': read_dev_index,
})
read_block = prog.block(0)
read_device = self._get_device_info(read_block)
read_block._insert_op(
index=0,
type='dequeue',
type='recv_v2',
outputs={'Out': [read_block.var(var_name)]},
attrs={
'out_shape': read_block.var(var_name).shape,
'dtype': read_block.var(var_name).dtype,
self._op_device_key: read_device,
'use_calc_stream': True,
# A trick to make the role LRSched to avoid copy every
# microbatch
self._op_role_key: self._op_role.LRSched,
'queue_name': queue_name,
'peer': write_dev_index
})
def minimize(self,
......@@ -4472,26 +4402,21 @@ class PipelineOptimizer(object):
startup_program = default_startup_program()
optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
self._update_param_device_map(params_grads, main_block)
self._param_device_map = self._optimizer._param_device_map
# Step1: add default op_device attribute for regulization and clip ops
self._add_opdevice_attr_for_regularization_clip(main_block)
# Step2: add default op_device attribute for ops whose op_device
# attribute have not been set yet.
# attribute have not been set yet. Then check all ops have the
# op_device attribute.
self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block)
# Step3: add enqueue and dequeue ops between section boundaries
origin_prog = main_block.program.clone(for_test=False)
origin_main_block = origin_prog.global_block()
self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block,
startup_program)
# Step4: add a pair of enqueue and dequeueN for parameter gradients
self._insert_enq_deq_ops_for_update(main_block, startup_program)
device_specs = self._check_validation(main_block)
assert len(device_specs) > 1
main_program = main_block.program
# Step3: add send and recv ops between section boundaries
self._insert_sendrecv_ops_for_boundaries(main_block)
place_list = []
place_id_list = []
......@@ -4506,37 +4431,56 @@ class PipelineOptimizer(object):
else:
raise ValueError("Unknown device type: %s", dev_spec)
# Step5: split program into sections and add pairs of
# enqueue and dequeue ops for data var.
if len(place_list) == 0:
program_list = []
ptmp = {
"program": main_program,
"input_set": set(),
"output_set": set()
}
program_list.append(ptmp)
else:
program_list = self._split_program(main_program)
# Step4: split program into sections and add pairs of
# send and recv ops for data var.
main_program = main_block.program
program_list = self._split_program(main_program, device_specs)
for p in program_list:
self._create_vars(p["program"].block(0), main_program)
self._insert_enq_deq_for_data_var(main_block, program_list,
self._insert_sendrecv_for_data_var(main_block, program_list,
startup_program, device_specs)
# Step6: Special Case: process persistable vars that exist in
# Step5: Special Case: process persistable vars that exist in
# multiple sections
self._process_persistable_vars_in_multi_sections(
main_program, startup_program, program_list)
# Step7: Add sub blocks for section programs
# Step6: Add sub blocks for section programs
self._add_sub_blocks(main_block, program_list)
assert (main_program._pipeline_opt and
isinstance(main_program._pipeline_opt, dict) and
'local_rank' in main_program._pipeline_opt), \
"You must use pipeline with fleet"
local_rank = main_program._pipeline_opt['local_rank']
# Step7: Split startup program
new_startup_program = self._split_startup_program(startup_program,
local_rank)
# Step8: clear gradients before each mini-batch and
# accumulate gradients during backward
self._clear_gradients(
program_list[local_rank]['program'].global_block(),
dev_spec=device_specs[local_rank])
self._accumulate_gradients(program_list[local_rank]['program']
.global_block())
with open("startup_prog_%d" % local_rank, 'w') as f:
f.writelines(str(new_startup_program))
with open("main_prog_%d" % local_rank, 'w') as f:
f.writelines(str(program_list[local_rank]['program']))
startup_program._pipeline_opt = {
"startup_program": new_startup_program,
}
main_program._pipeline_opt = {
"trainer": "PipelineTrainer",
"device_worker": "Section",
"section_program_list": program_list,
"place_list": place_list,
"place_id_list": place_id_list,
"inner_parallelism": len(device_specs),
"section_program": program_list[local_rank],
"place": place_list[local_rank],
"place_id": place_id_list[local_rank],
"sync_steps": -1,
"num_microbatches": self._num_microbatches,
"start_cpu_core_id": self._start_cpu_core_id,
......
......@@ -10,10 +10,12 @@ if(NOT WITH_NCCL)
endif()
string(REPLACE ".py" "" DIST_TEST_OPS "${DIST_TEST_OPS}")
list(APPEND DIST_TEST_OPS test_parallel_dygraph_mnist)
list(APPEND DIST_TEST_OPS test_pipeline)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_se_resnext)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding_over_height)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer)
list(APPEND DIST_TEST_OPS test_fleet_pipeline_meta_optimizer)
list(APPEND DIST_TEST_OPS test_listen_and_serv_op)
list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
......@@ -146,7 +148,6 @@ if (WITH_NCCL)
endif()
if(NOT WITH_GPU OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_pipeline)
LIST(REMOVE_ITEM TEST_OPS test_boxps)
endif()
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
......@@ -469,7 +470,6 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_fleet_sharding_meta_optimizer MODULES test_fleet_sharding_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_fp16_allreduce_meta_optimizer MODULES test_fleet_fp16_allreduce_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_pipeline_meta_optimizer MODULES test_fleet_pipeline_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_private_function MODULES test_fleet_private_function ENVS ${dist_ENVS})
py_test_modules(test_fleet_meta_optimizer_base MODULES test_fleet_meta_optimizer_base ENVS ${dist_ENVS})
py_test_modules(test_fleet_distributed_strategy MODULES test_fleet_distributed_strategy)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import numpy as np
import argparse
import time
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
from paddle.fluid import core
import unittest
from multiprocessing import Process
import os
import signal
from functools import reduce
from test_dist_base import TestDistRunnerBase, runtime_main
import paddle.distributed.fleet as fleet
paddle.enable_static()
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
def cnn_model(data):
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=data,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
value=0.01)))
SIZE = 10
input_shape = conv_pool_2.shape
param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5
predict = fluid.layers.fc(
input=conv_pool_2,
size=SIZE,
act="softmax",
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Constant(value=0.01)))
return predict
class TestDistMnist2x2(TestDistRunnerBase):
def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None):
# Input data
with fluid.device_guard("gpu:0"):
images = fluid.layers.data(
name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if dist_strategy:
data_loader = fluid.io.DataLoader.from_generator(
feed_list=[images, label],
capacity=64,
use_double_buffer=False,
iterable=False)
# Train program
predict = cnn_model(images)
with fluid.device_guard("gpu:1"):
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
with fluid.device_guard("gpu:1"):
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
inference_program = fluid.default_main_program().clone()
base_lr = self.lr
passes = [30, 60, 80, 90]
steps_per_pass = 10
bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
opt = fluid.optimizer.Momentum(learning_rate=lr_val, momentum=0.9)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size)
if dist_strategy:
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
strategy.pipeline = True
dist_opt = fleet.distributed_optimizer(
optimizer=opt, strategy=strategy)
dist_opt.minimize(avg_cost)
else:
opt.minimize(avg_cost)
if dist_strategy:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader
else:
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict
if __name__ == "__main__":
runtime_main(TestDistMnist2x2)
......@@ -124,6 +124,67 @@ class TestDistRunnerBase(object):
exe.run(pserver_prog)
print_to_err(type(self).__name__, "run pserver main program done.")
def run_pipeline_trainer(self, args):
self.lr = args.lr
dist_strategy = DistributedStrategy()
test_program, avg_cost, train_reader, test_reader, batch_acc, predict, data_loader = \
self.get_model(batch_size=args.batch_size, dist_strategy=dist_strategy)
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
eprint(type(self).__name__, "device_id: %d." % device_id)
place = fluid.CUDAPlace(device_id)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
eprint(type(self).__name__, "run worker startup program done.")
data_loader.set_sample_list_generator(train_reader, place)
data_loader.start()
print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = []
for i in six.moves.xrange(RUN_STEP):
loss = exe.run(fluid.default_main_program(), fetch_list=[avg_cost])
loss = loss[0] if loss else None
out_losses.append(loss)
print_to_err(type(self).__name__, "run step %d finished" % i)
print_to_err(type(self).__name__, "trainer run finished")
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
if args.save_model:
model_save_dir = "/tmp"
if fleet.worker_index() == 0:
model_save_dir_fluid = os.path.join(model_save_dir,
"fluid_persistables")
model_save_dir_fleet = os.path.join(model_save_dir,
"fleet_persistables")
infer_save_dir_fluid = os.path.join(model_save_dir,
"fluid_infer")
infer_save_dir_fleet = os.path.join(model_save_dir,
"fleet_infer")
else:
model_save_dir_fluid = os.path.join(model_save_dir,
"fluid_persistables_2")
model_save_dir_fleet = os.path.join(model_save_dir,
"fleet_persistables_2")
infer_save_dir_fluid = os.path.join(model_save_dir,
"fluid_infer_2")
infer_save_dir_fleet = os.path.join(model_save_dir,
"fleet_infer_2")
fluid.io.save_persistables(exe, model_save_dir_fluid,
fleet._origin_program)
fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
feeded_var_names = [var.name for var in feed_var_list]
fluid.io.save_inference_model(infer_save_dir_fluid,
feeded_var_names, [avg_cost], exe,
fleet._origin_program)
fleet.save_inference_model(exe, infer_save_dir_fleet,
feeded_var_names, [avg_cost])
def run_gpu_fleet_api_trainer(self, args):
assert args.update_method == "nccl2"
......@@ -532,6 +593,7 @@ def runtime_main(test_class):
parser.add_argument('--nccl_comm_num', type=int, required=False, default=1)
parser.add_argument('--enable_backward_deps', action='store_true')
parser.add_argument('--use_hallreduce', action='store_true')
parser.add_argument('--use_pipeline', action='store_true')
parser.add_argument('--gpu_fleet_api', action='store_true')
parser.add_argument('--use_local_sgd', action='store_true')
parser.add_argument('--ut4grad_allreduce', action='store_true')
......@@ -566,6 +628,8 @@ def runtime_main(test_class):
model.run_pserver(args)
elif args.gpu_fleet_api:
model.run_gpu_fleet_api_trainer(args)
elif args.use_pipeline:
model.run_pipeline_trainer(args)
else:
model.run_trainer(args)
......@@ -607,6 +671,7 @@ class TestDistBase(unittest.TestCase):
self._dc_asgd = False # must use with async mode
self._use_reader_alloc = True
self._nccl2_mode = False
self._pipeline_mode = False
self._mp_mode = False
# FIXME(typhoonzero): I added this stupid argument to enable
# testing allreduce layers, which users can call layers.allreduce
......@@ -892,6 +957,8 @@ class TestDistBase(unittest.TestCase):
if self._use_dgc:
tr_cmd += " --use_dgc"
if self._pipeline_mode:
tr_cmd += " --use_pipeline"
if self._mp_mode:
env = {"FLAGS_selected_gpus": "{}".format(trainer_id % 2)}
......@@ -978,6 +1045,51 @@ class TestDistBase(unittest.TestCase):
print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1])
def _run_pipeline(self, model, envs, check_error_log, log_name):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
worker_endpoints = self._ps_endpoints.split(",")
update_method = "nccl2"
trainer_num = len(worker_endpoints)
procs = []
pipes = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num)
tr_env.update(envs)
tr_env['CUDA_VISIBLE_DEVICES'] = "0,1"
tr_env['NCCL_SHM_DISABLE'] = '1'
tr_env['FLAGS_selected_gpus'] = str(i)
tr_env['FLAGS_cudnn_deterministic'] = '0'
print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env))
tr_pipe = open("/tmp/" + "tr{}_err.log".format(i), "wb")
print_to_err(
type(self).__name__,
"going to start process {} with nccl2".format(i))
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env)
procs.append(tr_proc)
pipes.append(tr_pipe)
outs = []
for i in range(0, trainer_num):
tr_out, tr_err = procs[i].communicate()
outs.append(tr_out)
pipes[i].close()
sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))
if check_error_log:
print("outs[0]:", outs[0])
print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1])
def _get_required_envs(self, check_error_log=False, need_envs={}):
# TODO(typhoonzero): should auto adapt GPU count on the machine.
required_envs = {
......@@ -1032,6 +1144,9 @@ class TestDistBase(unittest.TestCase):
False,
check_error_log,
log_name=log_name)
elif self._pipeline_mode:
tr0_losses, tr1_losses = self._run_pipeline(
model_file, required_envs, check_error_log, log_name=log_name)
else:
tr0_losses, tr1_losses = self._run_cluster(
model_file, required_envs, check_error_log, log_name=log_name)
......@@ -1040,6 +1155,9 @@ class TestDistBase(unittest.TestCase):
local_loss = local_losses[step_id]
tr0_loss = tr0_losses[step_id]
tr1_loss = tr1_losses[step_id]
if self._pipeline_mode:
dist_loss = np.array([tr1_loss])
else:
dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
print("=======", local_loss, ":", dist_loss[0], "=======")
self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)
......
......@@ -16,6 +16,8 @@ import unittest
import paddle
import os
paddle.enable_static()
class TestFleetMetaOptimizer(unittest.TestCase):
def setUp(self):
......@@ -28,19 +30,14 @@ class TestFleetMetaOptimizer(unittest.TestCase):
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with paddle.fluid.device_guard("cpu"):
with paddle.fluid.device_guard("gpu:0"):
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(
name="y", shape=[1], dtype='int64')
data_loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input_x, input_y],
capacity=64,
use_double_buffer=True,
iterable=False)
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
with paddle.fluid.device_guard("gpu:0"):
with paddle.fluid.device_guard("gpu:1"):
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2],
size=2,
......
......@@ -13,212 +13,32 @@
# limitations under the License.
from __future__ import print_function
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import numpy as np
import os
import shutil
import unittest
import math
def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1,
act=None):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
bias_attr=False)
return fluid.layers.batch_norm(
input=conv,
act=act, )
def shortcut(input, ch_out, stride, is_first):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1 or is_first == True:
return conv_bn_layer(input, ch_out, 1, stride)
else:
return input
def bottleneck_block(input, num_filters, stride):
conv0 = conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu')
conv2 = conv_bn_layer(
input=conv1, num_filters=num_filters * 4, filter_size=1, act=None)
short = shortcut(input, num_filters * 4, stride, is_first=False)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
def basic_block(input, num_filters, stride, is_first):
conv0 = conv_bn_layer(
input=input,
num_filters=num_filters,
filter_size=3,
act='relu',
stride=stride)
conv1 = conv_bn_layer(
input=conv0, num_filters=num_filters, filter_size=3, act=None)
short = shortcut(input, num_filters, stride, is_first)
return fluid.layers.elementwise_add(x=short, y=conv1, act='relu')
from test_dist_base import TestDistBase
def build_network(input, layers=50, class_dim=1000):
supported_layers = [18, 34, 50, 101, 152]
assert layers in supported_layers
depth = None
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
with fluid.device_guard("cpu"):
conv = conv_bn_layer(
input=input, num_filters=64, filter_size=7, stride=2, act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
if layers >= 50:
for block in range(len(depth)):
with fluid.device_guard("gpu:0"):
for i in range(depth[block]):
conv = bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1)
with fluid.device_guard("gpu:0"):
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)))
else:
for block in range(len(depth)):
with fluid.device_guard("gpu:0"):
for i in range(depth[block]):
conv = basic_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1,
is_first=block == i == 0)
with fluid.device_guard("gpu:0"):
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)))
return out
class TestPipeline(unittest.TestCase):
""" TestCases for Pipeline Training. """
def _run(self, debug):
main_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(main_prog, startup_prog):
with fluid.device_guard("cpu"):
image = fluid.layers.data(
name="image", shape=[3, 224, 224], dtype="float32")
label = fluid.layers.data(
name="label", shape=[1], dtype="int64")
data_loader = fluid.io.DataLoader.from_generator(
feed_list=[image, label],
capacity=64,
use_double_buffer=True,
iterable=False)
fc = build_network(image, layers=50)
with fluid.device_guard("gpu:0"):
out, prob = fluid.layers.softmax_with_cross_entropy(
logits=fc, label=label, return_softmax=True)
loss = fluid.layers.mean(out)
acc_top1 = fluid.layers.accuracy(input=prob, label=label, k=1)
acc_top5 = fluid.layers.accuracy(input=prob, label=label, k=5)
base_lr = 0.1
passes = [30, 60, 80, 90]
total_images = 1281167
steps_per_pass = total_images // 128
bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
optimizer = fluid.optimizer.MomentumOptimizer(
lr_val,
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer, num_microbatches=2)
optimizer.minimize(loss)
def train_reader():
for _ in range(4):
img = np.random.random(size=[3, 224, 224]).astype('float32')
label = np.random.random(size=[1]).astype('int64')
yield img, label
data_loader.set_sample_generator(train_reader, batch_size=1)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
data_loader.start()
exe.train_from_dataset(main_prog, debug=debug)
def test_pipeline(self):
self._run(False)
self._run(True)
def test_pipeline_noneoptimizer(self):
with fluid.device_guard("gpu:0"):
x = fluid.layers.data(
name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(
name='y', shape=[1], dtype='int64', lod_level=0)
emb_x = layers.embedding(
input=x,
param_attr=fluid.ParamAttr(name="embx"),
size=[10, 2],
is_sparse=False)
fc = layers.fc(input=emb_x,
name="fc",
size=1,
num_flatten_dims=1,
bias_attr=False)
loss = layers.reduce_mean(fc)
import os
import paddle
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
with self.assertRaises(ValueError):
optimizer = fluid.optimizer.PipelineOptimizer(
dict(), num_microbatches=2)
paddle.enable_static()
flag_name = os.path.splitext(__file__)[0]
class TestPipeline(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._pipeline_mode = True
self._nccl_comm_num = 1
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"pipeline_mnist.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册