From d999049f6cb66aa7a7a7c80a577a374e680f0481 Mon Sep 17 00:00:00 2001 From: ziyoujiyi <73728031+ziyoujiyi@users.noreply.github.com> Date: Thu, 2 Jun 2022 12:46:17 +0800 Subject: [PATCH] add federated learning parameter server(fl-ps) mode (#42682) * back fl * delete ssl cert * . * make warning * . * unittest paral degree * solve unittest * heter & multi cloud commm ready * . * . * fl-ps v1.0 * . * support N + N mode * . * . * . * . * delete print * . * . * . * . --- CMakeLists.txt | 1 + cmake/configure.cmake | 4 + .../distributed/ps/service/brpc_ps_server.cc | 0 .../distributed/ps/service/heter_client.cc | 3 +- .../distributed/ps/service/heter_client.h | 8 +- .../distributed/ps/service/heter_server.cc | 4 - .../distributed/ps/service/heter_server.h | 13 +- paddle/fluid/framework/data_feed.cc | 12 +- .../framework/distributed_strategy.proto | 1 + .../fluid/framework/heter_pipeline_trainer.cc | 61 ++- .../fluid/framework/heter_section_worker.cc | 101 ++++- .../fleet/base/distributed_strategy.py | 12 + .../distributed/fleet/base/util_factory.py | 20 + .../fleet/meta_optimizers/ps_optimizer.py | 1 + .../distributed/passes/ps_trainer_pass.py | 353 ++++++++++++++++-- python/paddle/distributed/ps/the_one_ps.py | 38 +- .../paddle/distributed/ps/utils/ps_factory.py | 5 +- .../ps/utils/ps_program_builder.py | 104 +++++- python/paddle/distributed/ps/utils/public.py | 99 +++-- python/paddle/fluid/executor.py | 58 ++- .../fluid/tests/custom_op/ps_usr_print_log | 0 .../tests/unittests/ps/dataset_generator_A.py | 49 +++ .../tests/unittests/ps/dataset_generator_B.py | 53 +++ .../fluid/tests/unittests/ps/download_data.sh | 27 ++ .../unittests/ps/fl_async_ps_config.yaml | 39 ++ .../fluid/tests/unittests/ps/fl_ps_trainer.py | 145 +++++++ .../tests/unittests/ps/ps_dnn_trainer.py | 33 +- .../fluid/tests/unittests/ps/test_fl_ps.py | 51 +++ .../fluid/tests/unittests/ps_dnn_model.py | 172 ++++++++- 29 files changed, 1300 insertions(+), 167 deletions(-) mode change 100644 => 100755 paddle/fluid/distributed/ps/service/brpc_ps_server.cc mode change 100644 => 100755 paddle/fluid/distributed/ps/service/heter_client.h mode change 100644 => 100755 paddle/fluid/framework/data_feed.cc mode change 100644 => 100755 paddle/fluid/framework/distributed_strategy.proto mode change 100644 => 100755 paddle/fluid/framework/heter_section_worker.cc mode change 100644 => 100755 python/paddle/distributed/fleet/base/distributed_strategy.py mode change 100644 => 100755 python/paddle/distributed/fleet/base/util_factory.py mode change 100644 => 100755 python/paddle/fluid/executor.py delete mode 100644 python/paddle/fluid/tests/custom_op/ps_usr_print_log create mode 100755 python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py create mode 100755 python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py create mode 100755 python/paddle/fluid/tests/unittests/ps/download_data.sh create mode 100755 python/paddle/fluid/tests/unittests/ps/fl_async_ps_config.yaml create mode 100755 python/paddle/fluid/tests/unittests/ps/fl_ps_trainer.py create mode 100755 python/paddle/fluid/tests/unittests/ps/test_fl_ps.py diff --git a/CMakeLists.txt b/CMakeLists.txt index f3ed08d56e6..70eb5f11ea1 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,6 +255,7 @@ option(WITH_POCKETFFT "Compile with pocketfft support" ON) option(WITH_RECORD_BUILDTIME "Compile PaddlePaddle with record all targets build time" OFF) option(WITH_CUSTOM_DEVICE "Compile with custom device support" OFF) option(WITH_ARM_BRPC "Supprot Brpc in Arm" OFF) +option(WITH_FLPS "FL PS mode" OFF) if(WITH_RECORD_BUILDTIME) set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}") diff --git a/cmake/configure.cmake b/cmake/configure.cmake index 5608b6f6f34..63ca901a940 100755 --- a/cmake/configure.cmake +++ b/cmake/configure.cmake @@ -78,6 +78,10 @@ if(WITH_ARM_BRPC) add_definitions(-DPADDLE_WITH_ARM_BRPC) endif() +if(WITH_FLPS) + add_definitions(-DPADDLE_WITH_FLPS) +endif() + if(WITH_GLOO) add_definitions(-DPADDLE_WITH_GLOO) endif() diff --git a/paddle/fluid/distributed/ps/service/brpc_ps_server.cc b/paddle/fluid/distributed/ps/service/brpc_ps_server.cc old mode 100644 new mode 100755 diff --git a/paddle/fluid/distributed/ps/service/heter_client.cc b/paddle/fluid/distributed/ps/service/heter_client.cc index fd0962caaae..44c03ca1757 100755 --- a/paddle/fluid/distributed/ps/service/heter_client.cc +++ b/paddle/fluid/distributed/ps/service/heter_client.cc @@ -139,8 +139,9 @@ void HeterClient::SendAndRecvAsync( message_name, send_var_name_val, recv_var_name_val, *p_ctx, p_scope, &request, &request_io_buffer); - int micro_id = GetMicroId(ctx, p_scope); + int micro_id = GetMicroId(ctx, p_scope); // global auto minibatch_id = micro_id / 10; + VLOG(4) << "micro_id: " << micro_id; // select channel according to micro id if (mode == "forward") { int num = minibatch_id % xpu_channels_.size(); diff --git a/paddle/fluid/distributed/ps/service/heter_client.h b/paddle/fluid/distributed/ps/service/heter_client.h old mode 100644 new mode 100755 index efaa48470a8..7683b8a1679 --- a/paddle/fluid/distributed/ps/service/heter_client.h +++ b/paddle/fluid/distributed/ps/service/heter_client.h @@ -155,13 +155,13 @@ class HeterClient { // HeterClient singleton static std::shared_ptr GetInstance( - const std::vector& endpoint, - const std::vector& previous_endpoint, + const std::vector& endpoints, + const std::vector& previous_endpoints, const int& trainer_id) { if (NULL == s_instance_) { s_instance_.reset(new HeterClient()); - s_instance_->SetXpuList(endpoint); - s_instance_->SetPreviousXpuList(previous_endpoint); + s_instance_->SetXpuList(endpoints); + s_instance_->SetPreviousXpuList(previous_endpoints); s_instance_->SetTrainerID(trainer_id); s_instance_->CreateClient2XpuConnection(); } diff --git a/paddle/fluid/distributed/ps/service/heter_server.cc b/paddle/fluid/distributed/ps/service/heter_server.cc index fd38a030ff3..4440647ac94 100755 --- a/paddle/fluid/distributed/ps/service/heter_server.cc +++ b/paddle/fluid/distributed/ps/service/heter_server.cc @@ -94,7 +94,6 @@ void HeterServer::StartHeterInterService(bool neeed_encrypt) { VLOG(4) << "switch inter server server start success! listen on " << endpoint_inter_; } - { std::lock_guard lock(this->mutex_ready_); stoped_ = false; @@ -115,9 +114,6 @@ void HeterServer::SetFanin(const int& fan_in) { service_.SetFanin(fan_in); } void HeterServer::WaitServerReady() { std::unique_lock lock(this->mutex_ready_); condition_ready_.wait(lock, [=] { return this->ready_ == 1; }); - while (!this->ready_) { - sleep(1); - } } int SendAndRecvVariableHandler::SaveInSwitchWithShard( diff --git a/paddle/fluid/distributed/ps/service/heter_server.h b/paddle/fluid/distributed/ps/service/heter_server.h index ddcf36bf68d..97028066e66 100755 --- a/paddle/fluid/distributed/ps/service/heter_server.h +++ b/paddle/fluid/distributed/ps/service/heter_server.h @@ -90,8 +90,10 @@ class ServiceHandlerBase { using SharedMiniScope = std::shared_ptr>; + using SharedMicroScope = std::shared_ptr>>>; + using SharedTaskQueue = std::shared_ptr< std::unordered_map>>>>; @@ -226,6 +228,7 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase { auto* tensor = var->GetMutable(); auto data = reinterpret_cast(tensor->data()); auto micro_id = static_cast(data[0]); + VLOG(4) << "micro_id in heter server: " << micro_id; int minibatch_index = micro_id / 10; int microbatch_index = micro_id % 10; @@ -261,6 +264,9 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase { distributed::DeserializeFromMultiVarMsgAndIOBuf( *request, &request_io_buffer, *dev_ctx_, micro_scope); // blocking queue handles multi thread + VLOG(4) << "Handle in HeterServer: " << message_name << ", " + << microbatch_index; + VLOG(4) << "task_queue_ size: " << task_queue_->size(); (*task_queue_)[minibatch_index]->Push( std::make_pair(message_name, microbatch_index)); @@ -274,6 +280,7 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase { distributed::SerializeToMultiVarMsgAndIOBuf( message_name, response_var_names, empty_var_names, *dev_ctx_, &local_scope, response, &response_io_buffer); + VLOG(4) << "Handle over"; return 0; } @@ -612,11 +619,9 @@ class HeterServer { // HeterWrapper singleton static std::shared_ptr GetInstance() { + std::unique_lock lock(mtx_); if (s_instance_ == nullptr) { - std::unique_lock lock(mtx_); - if (NULL == s_instance_) { - s_instance_.reset(new HeterServer()); - } + s_instance_.reset(new HeterServer()); } return s_instance_; } diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc old mode 100644 new mode 100755 index b63f317aae8..0801aa0e56a --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -220,6 +220,7 @@ bool DataFeed::PickOneFile(std::string* filename) { file_idx_, platform::errors::PreconditionNotMet( "You should call SetFileListIndex before PickOneFile")); std::unique_lock lock(*mutex_for_pick_file_); + VLOG(4) << "filelist_ size: " << filelist_.size(); if (*file_idx_ == filelist_.size()) { VLOG(3) << "DataFeed::PickOneFile no more file to pick"; return false; @@ -284,6 +285,7 @@ void PrivateQueueDataFeed::SetQueueSize(int queue_size) { template bool PrivateQueueDataFeed::Start() { + VLOG(4) << "entering PrivateQueueDataFeed::Start()"; CheckSetFileList(); read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); read_thread_.detach(); @@ -295,6 +297,7 @@ bool PrivateQueueDataFeed::Start() { template void PrivateQueueDataFeed::ReadThread() { #ifdef _LINUX + VLOG(4) << "entering PrivateQueueDataFeed::ReadThread()"; std::string filename; while (PickOneFile(&filename)) { int err_no = 0; @@ -356,6 +359,7 @@ InMemoryDataFeed::InMemoryDataFeed() { template bool InMemoryDataFeed::Start() { #ifdef _LINUX + VLOG(4) << "entering InMemoryDataFeed::Start()"; this->CheckSetFileList(); if (output_channel_->Size() == 0 && input_channel_->Size() != 0) { std::vector data; @@ -664,6 +668,7 @@ void MultiSlotDataFeed::Init( void MultiSlotDataFeed::ReadThread() { #ifdef _LINUX + VLOG(4) << "entering MultiSlotDataFeed::ReadThread()"; std::string filename; while (PickOneFile(&filename)) { int err_no = 0; @@ -831,7 +836,6 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( } else { int use_slots_num = use_slots_.size(); instance->resize(use_slots_num); - const char* str = reader.get(); std::string line = std::string(str); @@ -971,10 +975,13 @@ void MultiSlotDataFeed::PutToFeedVec( if (feed_vec_[i] == nullptr) { continue; } + VLOG(4) << "MultiSlotDataFeed::PutToFeedVec i: " << i; const auto& type = ins_vec[i].GetType(); const auto& offset = ins_vec[i].GetOffset(); int total_instance = static_cast(offset.back()); - + VLOG(4) << "total_instance: " << total_instance; + // platform::CPUPlace() + VLOG(4) << "this->place_: " << this->place_; if (type[0] == 'f') { // float const auto& feasign = ins_vec[i].GetFloatData(); float* tensor_ptr = @@ -2573,6 +2580,7 @@ void SlotRecordInMemoryDataFeed::ExpandSlotRecord(SlotRecord* rec) { } bool SlotRecordInMemoryDataFeed::Start() { + VLOG(4) << "entering SlotRecordInMemoryDataFeed::Start"; #ifdef _LINUX this->CheckSetFileList(); if (input_channel_->Size() != 0) { diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto old mode 100644 new mode 100755 index 94753f8dd38..b3a01ae169e --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -315,6 +315,7 @@ message DistributedStrategy { optional bool adam_d2sum = 36 [ default = false ]; optional bool auto_search = 37 [ default = false ]; optional bool heter_ccl_mode = 38 [ default = false ]; + optional bool is_fl_ps_mode = 39 [ default = false ]; optional RecomputeConfig recompute_configs = 101; optional AMPConfig amp_configs = 102; diff --git a/paddle/fluid/framework/heter_pipeline_trainer.cc b/paddle/fluid/framework/heter_pipeline_trainer.cc index d0d3c2fea3b..dc99885811c 100644 --- a/paddle/fluid/framework/heter_pipeline_trainer.cc +++ b/paddle/fluid/framework/heter_pipeline_trainer.cc @@ -32,7 +32,9 @@ using TaskQueue = std::pair>>>; void HeterPipelineTrainer::ResetDataset(Dataset* dataset) { +#ifndef PADDLE_WITH_FLPS if (pipeline_stage_ == 0) { +#endif SetDataset(dataset); const std::vector readers = dataset->GetReaders(); @@ -51,40 +53,39 @@ void HeterPipelineTrainer::ResetDataset(Dataset* dataset) { this_worker->SetDataFeed(readers[cnt]); this_worker->SetReaderPlace(place_); } +#ifndef PADDLE_WITH_FLPS } +#endif } void HeterPipelineTrainer::Initialize(const TrainerDesc& trainer_desc, Dataset* dataset) { + trainer_desc_ = trainer_desc; thread_num_ = trainer_desc.thread_num(); ParseDumpConfig(trainer_desc); SetDebug(trainer_desc.debug()); const std::vector readers = dataset->GetReaders(); - VLOG(3) << "readers num: " << readers.size(); // change thread num to readers num thread_num_ = readers.size(); - VLOG(3) << "worker thread num: " << thread_num_; + VLOG(3) << "worker(readers) thread num: " << thread_num_; const auto& heter_section_params = trainer_desc.heter_section_param(); num_pipeline_stages_ = heter_section_params.num_pipeline_stages(); pipeline_stage_ = heter_section_params.pipeline_stage(); num_microbatches_ = heter_section_params.num_microbatches(); VLOG(3) << "Number of microbatches per minibatch: " << num_microbatches_; - trainer_desc_ = trainer_desc; trainer_id_ = trainer_desc.trainer_id(); for (int i = 0; i < num_pipeline_stages_; ++i) { auto trainer_num = trainer_desc.trainers(i); trainers_.push_back(trainer_num); } int cpu_trainer_num = trainers_[0]; - // int cur_stage_trainer_num = trainers_[pipeline_stage_]; - // int global_thread_num = cpu_trainer_num * thread_num_; - // int previous_trainers = 0; - // for (int i = 0; i < pipeline_stage_; i++) previous_trainers += - // trainers_[i]; - // int stage_trainer_id = - // trainer_id_ - previous_trainers; // trainer id in current stage - + VLOG(4) << "trainer_id_: " << trainer_id_; + VLOG(4) << "cpu_trainer_num: " << cpu_trainer_num + << " xpu_trainer_num: " << trainers_[1]; +#ifdef PADDLE_WITH_FLPS + thread_num_ = 1; +#endif if (pipeline_stage_ == 0) { // for cpu trainer int cnt = -1; int real_thread_id = trainer_id_; @@ -103,25 +104,33 @@ void HeterPipelineTrainer::Initialize(const TrainerDesc& trainer_desc, this_worker->InitRandomDumpConfig(trainer_desc); this_worker->SetDeviceIndex(real_thread_id); real_thread_id += cpu_trainer_num; - // if (pipeline_stage_ == 0) { this_worker->SetDataFeed(readers[cnt]); - //} this_worker->SetMicrobatchNum(num_microbatches_); this_worker->SetPipelineStageNum(num_pipeline_stages_); this_worker->SetPipelineStage(pipeline_stage_); } - } else { // for heter_trainer - // heter trainer with thread_id == -1 is not for - // real training + } else { + // for heter_trainer + // heter trainer with thread_id == -1 is not for real training, just for run + // listen op workers_[-1] = DeviceWorkerFactory::CreateDeviceWorker( trainer_desc.device_worker_name()); auto this_worker = std::dynamic_pointer_cast( workers_[-1]); +#ifdef PADDLE_WITH_FLPS + this_worker->SetDebug(debug_); + this_worker->SetNeedDumpField(need_dump_field_); + this_worker->SetNeedDumpParam(need_dump_param_); + this_worker->SetDumpFieldVector(dump_fields_); + this_worker->SetDumpParamVector(dump_param_); + this_worker->InitRandomDumpConfig(trainer_desc); + this_worker->SetDataFeed(readers[0]); +#endif + this_worker->SetDeviceIndex(-1); this_worker->SetMicrobatchNum(num_microbatches_); this_worker->SetPipelineStageNum(num_pipeline_stages_); this_worker->SetPipelineStage(pipeline_stage_); - this_worker->SetDeviceIndex(-1); } } @@ -159,14 +168,19 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, for (auto& worker_pair : workers_) { auto worker_index = worker_pair.first; auto device_worker = worker_pair.second; + VLOG(0) << "workers index in InitTrainerEnv: " << worker_index; auto this_worker = std::dynamic_pointer_cast( device_worker); this_worker->SetPlace(place); this_worker->Initialize(trainer_desc_); +#ifdef PADDLE_WITH_FLPS + this_worker->SetReaderPlace(place); +#else if (pipeline_stage_ == 0) { this_worker->SetReaderPlace(place); } +#endif this_worker->SetRootScope(root_scope_); // generate mini_batch scope for every worker auto* minibatch_scope = &root_scope_->NewScope(); @@ -175,6 +189,7 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, // after set micro num & mini batch scope this_worker->CreateMicrobatchScopes(); (*micro_scopes_)[worker_index] = this_worker->GetMicrobatchScopes(); + VLOG(4) << "worker_index: " << worker_index; (*task_queue_)[worker_index] = this_worker->GetThreadQueue(); } } @@ -182,6 +197,7 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, void HeterPipelineTrainer::Run() { VLOG(3) << "Going to run HeterPipelineTrainer::Run()"; if (listen_ptr_ == nullptr) { + VLOG(3) << "listen_ptr_ is null"; for (auto& worker_pair : workers_) { auto& device_worker = worker_pair.second; auto worker_0 = @@ -196,10 +212,14 @@ void HeterPipelineTrainer::Run() { heter_server->WaitServerReady(); heter_server->SetMiniBatchScopes(mini_scopes_); heter_server->SetMicroBatchScopes(micro_scopes_); + VLOG(4) << "heter_server SetTaskQueue"; heter_server->SetTaskQueue(task_queue_); + // main training logic + VLOG(3) << "pipeline_stage_ is " << pipeline_stage_; if (pipeline_stage_ == 0) { // for cpu trainer for (auto& worker_pair : workers_) { + VLOG(4) << "cpu worker index : " << worker_pair.first; auto device_worker = worker_pair.second; if (!debug_) { threads_.push_back( @@ -212,6 +232,7 @@ void HeterPipelineTrainer::Run() { } else { // for heter worker // start thread_worker with thread_id = -1 for (auto& worker_pair : workers_) { + VLOG(4) << "xpu worker index : " << worker_pair.first; auto device_worker = worker_pair.second; if (!debug_) { threads_.push_back( @@ -252,6 +273,10 @@ void HeterPipelineTrainer::Run() { this_worker->SetPipelineStageNum(num_pipeline_stages_); this_worker->SetPipelineStage(pipeline_stage_); this_worker->SetPlace(place_); +#ifdef PADDLE_WITH_FLPS + this_worker->SetDataFeed(workers_[-1]->device_reader_); + this_worker->SetReaderPlace(place_); +#endif this_worker->Initialize(trainer_desc_); this_worker->SetRootScope(root_scope_); @@ -308,5 +333,5 @@ Scope* HeterPipelineTrainer::GetWorkerScope(int thread_id) { } } // end namespace framework -} // end namespace paddle +} // namespace paddle #endif diff --git a/paddle/fluid/framework/heter_section_worker.cc b/paddle/fluid/framework/heter_section_worker.cc old mode 100644 new mode 100755 index b6759bb2e6f..acbfe21ecda --- a/paddle/fluid/framework/heter_section_worker.cc +++ b/paddle/fluid/framework/heter_section_worker.cc @@ -65,6 +65,52 @@ class TrainerDesc; uint64_t HeterSectionWorker::batch_id_(0); +#ifdef PADDLE_WITH_FLPS +void HeterSectionWorker::Initialize(const TrainerDesc& desc) { + trainer_desc_ = desc; + fetch_config_ = desc.fetch_config(); + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); + program_.reset(new ProgramDesc( + desc.heter_section_param().section_config().program_desc())); + thread_queue_.reset( + new ::paddle::framework::BlockingQueue>()); + VLOG(4) << "addr of thread_queue_ is: " << thread_queue_.get(); + bool is_first_stage = (pipeline_stage_ == 0); + bool is_last_stage = (pipeline_stage_ + 1 == num_pipeline_stages_); + + if (is_first_stage) { + VLOG(0) << "entering first stage"; + for (auto& op_desc : program_->Block(0).AllOps()) { + forward_ops_.push_back(std::move(OpRegistry::CreateOp(*op_desc))); + } + for (auto& op_desc : program_->Block(1).AllOps()) { + auto op = std::move(OpRegistry::CreateOp(*op_desc)); + auto op_type = op->Type(); + if (listen_op_ == nullptr && op_type == "heter_listen_and_serv") { + listen_op_ = std::move(op); + } else { + backward_ops_.push_back(std::move(op)); + } + } + } else if (is_last_stage) { + VLOG(0) << "HeterSectionWorker::Initialize for the last stage"; + for (auto& op_desc : program_->Block(0).AllOps()) { + auto op = std::move(OpRegistry::CreateOp(*op_desc)); + auto op_type = op->Type(); + if (listen_op_ == nullptr && op_type == "heter_listen_and_serv") { + listen_op_ = std::move(op); + } else { + forward_ops_.push_back(std::move(op)); + } + } + VLOG(0) << "test111"; + for (auto& op_desc : program_->Block(1).AllOps()) { + auto op = std::move(OpRegistry::CreateOp(*op_desc)); + backward_ops_.push_back(std::move(op)); + } + } +} +#else void HeterSectionWorker::Initialize(const TrainerDesc& desc) { trainer_desc_ = desc; fetch_config_ = desc.fetch_config(); @@ -122,6 +168,7 @@ void HeterSectionWorker::Initialize(const TrainerDesc& desc) { } } } +#endif void HeterSectionWorker::RunBackward(int micro_id) { for (size_t i = 0; i < backward_ops_.size(); i++) { @@ -147,8 +194,11 @@ void HeterSectionWorker::RunBackward(int micro_id) { void HeterSectionWorker::MiniBatchBarrier() { // get micro id & deserialize data std::set micro_ids; + VLOG(4) << "entering MiniBatchBarrier"; + VLOG(4) << "micro_ids_.size(): " << micro_ids_.size(); while (micro_ids.size() < micro_ids_.size()) { auto task = (*thread_queue_).Pop(); + VLOG(4) << "got one task from task que in cpu worker"; auto message_name = task.first; auto micro_id = task.second; PADDLE_ENFORCE_EQ(message_name.find("backward") != std::string::npos, true, @@ -164,19 +214,44 @@ void HeterSectionWorker::MiniBatchBarrier() { RunBackward(micro_id); batch_num_++; BatchPostProcess(); + VLOG(0) << "one task in cpu worker overed!"; } micro_ids_.clear(); } -void HeterSectionWorker::RunListen() { listen_op_->Run(*root_scope_, place_); } +void HeterSectionWorker::RunListen() { + VLOG(4) << ">>> run listen_op"; + listen_op_->Run(*root_scope_, place_); + VLOG(4) << "<<< run listen_op over"; +} void HeterSectionWorker::RunForward(int micro_id) { +#ifdef PADDLE_WITH_FLPS + BindingDataFeedMemory(micro_id); + if (debug_) { + timeline_.Start(); + } + int cur_micro_batch = device_reader_->Next(); + if (cur_micro_batch <= 0) { + VLOG(0) << "no more data in device_reader_"; + epoch_finish_ = true; + return; + } + if (debug_) { + timeline_.Pause(); + read_time_ += timeline_.ElapsedSec(); + total_time_ += timeline_.ElapsedSec(); + total_ins_num_ += cur_micro_batch; + } + VLOG(3) << "read a batch in thread " << thread_id_ << " micro " << micro_id; +#else if (pipeline_stage_ == 0) { BindingDataFeedMemory(micro_id); if (debug_) { timeline_.Start(); } - int cur_micro_batch = device_reader_->Next(); + int cur_micro_batch = + device_reader_->Next(); // batch_size is just micro_batch_size if (cur_micro_batch <= 0) { epoch_finish_ = true; return; @@ -189,6 +264,7 @@ void HeterSectionWorker::RunForward(int micro_id) { } VLOG(3) << "read a batch in thread " << thread_id_ << " micro " << micro_id; } +#endif for (size_t i = 0; i < forward_ops_.size(); i++) { auto& op = forward_ops_[i]; VLOG(3) << "Forward: start to run op " << op->Type() << " for micro-batch " @@ -301,7 +377,7 @@ void HeterSectionWorker::Run() { while (!epoch_finish_) { // forward for (int i = 0; i < num_microbatches_; i++) { - VLOG(5) << "Run " << i << " microbatch"; + VLOG(4) << "Run " << i << " microbatch"; RunForward(i); if (epoch_finish_ == true) { break; @@ -312,15 +388,19 @@ void HeterSectionWorker::Run() { if (micro_ids_.size() > 0) { MiniBatchBarrier(); } + VLOG(0) << "one batch run over! micro_ids_size: " << micro_ids_.size(); } } else { // for heter worker + VLOG(4) << "entering heter Run..."; auto heter_server = paddle::distributed::HeterServer::GetInstance(); while (true) { if (heter_server->IsStop()) { + VLOG(0) << "heter_server is stopped!!"; epoch_finish_ = true; break; } auto task = (*thread_queue_).Pop(); + VLOG(4) << "got one task from task que in heter worker"; auto message_name = task.first; auto micro_id = task.second; if (is_last_stage) { @@ -331,6 +411,8 @@ void HeterSectionWorker::Run() { RunBackward(micro_id); batch_num_++; BatchPostProcess(); + VLOG(0) << "one batch run over! micro_id: " << micro_id + << " batch_num: " << batch_num_; } else { if (message_name.find("forward") != std::string::npos) { RunForward(micro_id); @@ -371,6 +453,7 @@ void HeterSectionWorker::BatchPostProcess() { } void HeterSectionWorker::TrainFiles() { + VLOG(4) << "entering HeterSectionWorker::TrainFiles"; if (thread_id_ >= 0) { total_ins_num_ = 0; batch_num_ = 0; @@ -378,9 +461,17 @@ void HeterSectionWorker::TrainFiles() { timeline_.Start(); VLOG(3) << "begin section_worker TrainFiles"; epoch_finish_ = false; +#ifdef PADDLE_WITH_FLPS + if (device_reader_ == nullptr) { + VLOG(4) << "device_reader_ is null!!"; + } + device_reader_->Start(); +#else if (pipeline_stage_ == 0) { device_reader_->Start(); } +#endif + VLOG(4) << "Run in TrainFiles:"; while (!epoch_finish_) { Run(); dev_ctx_->Wait(); @@ -428,9 +519,13 @@ void HeterSectionWorker::TrainFilesWithProfiler() { total_ins_num_ = 0; op_name_.clear(); op_total_time_.clear(); +#ifdef PADDLE_WITH_FLPS + device_reader_->Start(); +#else if (pipeline_stage_ == 0) { device_reader_->Start(); } +#endif while (!epoch_finish_) { Run(); dev_ctx_->Wait(); diff --git a/python/paddle/distributed/fleet/base/distributed_strategy.py b/python/paddle/distributed/fleet/base/distributed_strategy.py old mode 100644 new mode 100755 index 414edb9b66d..fe997c08509 --- a/python/paddle/distributed/fleet/base/distributed_strategy.py +++ b/python/paddle/distributed/fleet/base/distributed_strategy.py @@ -1318,6 +1318,18 @@ class DistributedStrategy(object): """ return self.strategy.pipeline + @property + def is_fl_ps_mode(self): + return self.strategy.is_fl_ps_mode + + @is_fl_ps_mode.setter + @is_strict_auto + def is_fl_ps_mode(self, flag): + if isinstance(flag, bool): + self.strategy.is_fl_ps_mode = flag + else: + print("WARNING: is_fl_ps_mode should have value of bool type") + @pipeline.setter @is_strict_auto def pipeline(self, flag): diff --git a/python/paddle/distributed/fleet/base/util_factory.py b/python/paddle/distributed/fleet/base/util_factory.py old mode 100644 new mode 100755 index de101cd74c4..7f1712289e8 --- a/python/paddle/distributed/fleet/base/util_factory.py +++ b/python/paddle/distributed/fleet/base/util_factory.py @@ -204,6 +204,26 @@ class UtilBase(object): def _scatter(self): pass + def get_heter_file_shard(self, files): + if not isinstance(files, list): + raise TypeError("files should be a list of file need to be read.") + trainers = self.role_maker._worker_num() + trainer_id = self.role_maker._worker_index() - trainers + remainder = len(files) % trainers + blocksize = int(len(files) / trainers) + + blocks = [blocksize] * trainers + for i in range(remainder): + blocks[i] += 1 + + trainer_files = [[]] * trainers + begin = 0 + for i in range(trainers): + trainer_files[i] = files[begin:begin + blocks[i]] + begin += blocks[i] + + return trainer_files[trainer_id] + def get_file_shard(self, files): """ Split files before distributed training, and return filelist assigned to the current trainer. diff --git a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py index d9062484bb5..d223ff032d4 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py @@ -75,6 +75,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): "use_ps_gpu"] attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[ "lr_decay_steps"] + attrs['is_fl_ps_mode'] = self.user_defined_strategy.is_fl_ps_mode attrs['k_steps'] = self.user_defined_strategy.a_sync_configs["k_steps"] attrs['launch_barrier'] = self.user_defined_strategy.a_sync_configs[ "launch_barrier"] diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 6112a9a1f45..876e04b7081 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -17,9 +17,11 @@ import paddle import paddle.compat as cpt from ..ps.utils.public import * from paddle.framework import core -from .pass_base import PassBase, register_pass +from paddle.distributed.passes.pass_base import PassBase, register_pass from paddle.fluid.transpiler.details.program_utils import delete_ops from paddle.fluid.transpiler.collective import SingleProcessMultiThread +from _collections import deque, defaultdict +from paddle.fluid.framework import Program, Parameter @register_pass("append_send_ops_pass") @@ -47,7 +49,6 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: dummy_output = program.global_block().create_var( name=framework.generate_control_dev_var_name()) - logger.info("dummy_output: {}".format(dummy_output)) program.global_block().append_op( type="send", inputs={"X": send_input_vars}, @@ -74,31 +75,27 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 def _apply_single_impl(self, main_program, startup_program, pass_ctx): attrs = pass_ctx._attrs - print("pass loss program id:", id(attrs['loss'].block.program)) - print("pass main program id:", id(main_program)) ps_mode = attrs['ps_mode'] if ps_mode == DistributedMode.GEO: send_ctx = get_geo_trainer_send_context(attrs) # geo 模式 + elif attrs['is_heter_ps_mode'] == True: + print("is_heter_ps_mode in append_send_ops_pass!!") + send_ctx = get_the_one_send_context(attrs, split_dense_table=True) else: send_ctx = get_the_one_send_context(attrs) # async、sync 等各种模式 - logger.info("send_ctx: {}".format(send_ctx)) dummys = [] for merged_name, send in send_ctx.items(): if send.is_sparse() and ps_mode != DistributedMode.GEO: continue if send.program_id() != id(attrs['loss'].block.program): continue - logger.info('merged_name, send: {}, {}'.format(merged_name, send)) is_sparse = 1 if send.is_sparse() else 0 is_sparse = 2 if send.is_distributed() else is_sparse dummys.append( self._append_send_op(main_program, send.origin_varnames(), merged_name, is_sparse, send.table_id(), ps_mode)) - logger.info('ps trainer pass - ps mode: {}'.format(ps_mode)) - logger.info('dummys: {}'.format(dummys)) if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: - logger.info('insert send_barrier_op') trainer_id = get_role_id(attrs['role_maker']) self._append_barrier_op(main_program, dummys, trainer_id) @@ -453,6 +450,8 @@ class DistributedOpsPass(PassBase): attrs = pass_ctx._attrs pull_sparse_ops, push_sparse_ops, use_cvm_op = self._get_pull_sparse_ops( main_program, attrs) + print("is_heter_ps_mode in distributed_ops_pass {}?".format(attrs[ + 'is_heter_ps_mode'])) send_ctx = get_the_one_send_context( attrs, split_dense_table=attrs['is_heter_ps_mode']) self._pull_sparse_fuse(main_program, pull_sparse_ops, attrs, send_ctx) @@ -505,7 +504,6 @@ class DeleteOptimizesPass(PassBase): persistable=True) def _apply_single_impl(self, main_program, startup_program, pass_ctx): - print("delete_optimizer_pass") attrs = pass_ctx._attrs optimizer_ops = get_optimize_ops(main_program) lr_ops = get_lr_ops(main_program) @@ -833,9 +831,9 @@ class SplitHeterWorkerOpsPass(PassBase): block_var_detail, current_device, False) # add send op - send_grad_var_list = add_heter_send_op(program, heter_program, - heter_block_bp, - block_var_detail[stage_id - 1]) + send_grad_var_list = add_send_op( + program, heter_block_bp, + block_var_detail[stage_id - 1]["backward"]["persistables"]) # add step conter send_input_vars = [] @@ -909,7 +907,7 @@ class SplitTrainerOpsPass(PassBase): first_op_idx = all_op.index(op) break assert first_op_idx != -1 - self._delete_same_ops(program.global_block(), ops_list) + delete_same_ops(program.global_block(), ops_list) entrance_var = [] role_maker = attrs['role_maker'] @@ -939,17 +937,6 @@ class SplitTrainerOpsPass(PassBase): return entrance_var - def _delete_same_ops(self, block, ops): - for op in ops: - try: - for origin_op in block.ops: - if str(origin_op) == str(op): - idx = list(block.ops).index(origin_op) - block._remove_op(idx) - break - except Exception as e: - print(e) - def _remove_var_pair_by_grad(self, var_name, attrs): for index, pair in enumerate(attrs['merged_variables_pairs']): var = pair[0] @@ -1019,7 +1006,7 @@ class SplitTrainerOpsPass(PassBase): grad_to_block_id = [] bp_ops_list = program_block_ops_list[0]["backward"] - self._delete_same_ops(program.global_block(), bp_ops_list) + delete_same_ops(program.global_block(), bp_ops_list) delete_trainer_useless_var(program, static_var) backward_block = create_backward_block(program, origin_program, bp_ops_list, block_var_detail) @@ -1093,12 +1080,13 @@ class SetHeterPipelineOptPass(PassBase): num_microbatches = attrs['user_defined_strategy'].pipeline_configs[ 'accumulate_steps'] - attrs['origin_startup_program']._heter_pipeline_opt = { + startup_program._heter_pipeline_opt = { "startup_program": startup_program, "pipeline_stage": int(role_maker._get_stage_id()) - 1, "heter_place": role_maker._heter_device(), + "is_fl_mode": 1 } - attrs['origin_main_program']._heter_pipeline_opt = { + main_program._heter_pipeline_opt = { "trainer": "HeterPipelineTrainer", "device_worker": "HeterSection", "trainers": @@ -1109,4 +1097,313 @@ class SetHeterPipelineOptPass(PassBase): "section_program": main_program, "num_microbatches": num_microbatches, "heter_place": role_maker._heter_device(), + "is_fl_mode": 1 } + + +@register_pass("split_fl_ops_pass") +class SplitFlOpsPass(PassBase): + def __init__(self): + super(SplitFlOpsPass, self).__init__() + self.PART_A_DEVICE_FlAG = 'gpu:0' + self.PART_A_JOINT_OP_DEVICE_FlAG = 'gpu:2' + self.PART_B_DEVICE_FlAG = 'gpu:1' + self.PART_B_JOINT_OP_DEVICE_FlAG = 'gpu:3' + + def _check_self(self): + return True + + def _check_conflict(self, other_pass): + return True + + def _insert_encrypt_op(self): + pass + + def _insert_decrypt_op(self): + pass + + def _clear_op_device_flag(self, program): + for block in program.blocks: + for op in block.ops: + device = op.attr(OP_DEVICE_KEY) + op._set_attr(OP_DEVICE_KEY, '') if device != '' else None + + def _split_fl_program(self): + self.partA_ops = [] + self.partB_ops = [] + party_program_map = defaultdict(Program) + block = self.ori_main_program.block(0) + for op in block.ops: + device = op.attr(OP_DEVICE_KEY) + if device == self.PART_A_DEVICE_FlAG or device == '' or device == self.PART_A_JOINT_OP_DEVICE_FlAG: + program = party_program_map['a'] + self.partA_ops.append(op) + elif device == self.PART_B_DEVICE_FlAG or device == self.PART_B_JOINT_OP_DEVICE_FlAG: + program = party_program_map['b'] + self.partB_ops.append(op) + op_desc = op.desc + ap_op = program.global_block().desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(OP_DEVICE_KEY, device) + + for key in ['a', 'b']: + program = party_program_map[key] + program._sync_with_cpp() + + return party_program_map + + def _insert_partA_communicate_op(self, block, idx): + comm_info = "forward_joint_{}_{}@fl_ps".format(1, 2) + block._insert_op( + idx, + type='send_and_recv', + inputs={'X': self.partA_to_partB_tensor}, + outputs={'Out': []}, + attrs={ + 'mode': 'forward', # mode 直接关联前向和反向 channel 选择 + 'send_var_name': + self.partA_to_partB_tensor_name + ["microbatch_id"], + 'recv_var_name': [], + 'message_name': comm_info, + 'next_endpoints': + get_next_stage_trainers(self.role_maker), # partB_endpoints + 'previous_endpoints': + get_previous_stage_trainers(self.role_maker), + 'trainer_id': get_role_id(self.role_maker), # global id + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + return + + def _insert_partB_communicate_op(self, block, idx): + comm_info = ("backward_joint_{}_{}@fl_ps".format(2, 1)) + block._insert_op( + idx, + type='send_and_recv', + inputs={'X': self.partB_to_partA_grad}, + outputs={'Out': []}, + attrs={ + 'mode': 'backward', + 'send_var_name': + self.partB_to_partA_grad_name + ["microbatch_id"], + 'recv_var_name': [], + 'message_name': comm_info, + 'next_endpoints': + get_next_stage_trainers(self.role_maker), # partA_endpoints + 'previous_endpoints': + get_previous_stage_trainers(self.role_maker), + 'trainer_id': get_role_id(self.role_maker), # global id + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + return + + def _create_var_for_block(self, vars, block): + for var in vars: + if block._find_var_recursive(str(var)): + continue + source_var = self.ori_main_block._var_recursive(str(var)) + if isinstance(var, Parameter): + dest_var = block.create_parameter( + name=source_var.name, + shape=source_var.shape, + dtype=source_var.dtype, + type=source_var.type, + lod_level=source_var.lod_level, + stop_gradient=source_var.stop_gradient, + trainable=source_var.trainable, + optimize_attr=source_var.optimize_attr, + regularizer=source_var.regularizer, + error_clip=source_var.error_clip) + else: + dest_var = block._clone_variable(source_var, False) + dest_var.stop_gradient = source_var.stop_gradient + if hasattr(source_var, 'is_distributed'): + dest_var.is_distributed = source_var.is_distributed + + def _get_block_by_idx(self, op_list, program, block_idx): + if block_idx < len(program.blocks): + new_block = program.block(block_idx) + else: + new_block = program._create_block() + for _, op in enumerate(op_list): + ap_op = new_block.desc.append_op() + ap_op.copy_from(op.desc) + ap_op._set_attr(OP_DEVICE_KEY, op.attr(OP_DEVICE_KEY)) + vars = op.desc.input_arg_names() + op.desc.output_arg_names() + self._create_var_for_block(vars, new_block) + new_block._sync_with_cpp() + return new_block + + def _find_joint_forward_op(self, block, flag): + op_idx = 0 + for op in block.ops: + if is_forward_op(op) and op.attr(OP_DEVICE_KEY) == flag: + return op_idx + else: + op_idx += 1 + return op_idx + + def _find_joint_backward_op(self, block, flag): + op_idx = 0 + for op in block.ops: + if is_backward_op(op) and op.attr(OP_DEVICE_KEY) == flag: + return op_idx + else: + op_idx += 1 + return op_idx + + def _get_partB_to_partA_grad(self, block, flag): + op_idx = self._find_joint_backward_op(block, flag) + op = block.ops[op_idx] + vars1 = op.desc.input_arg_names() + op_idx = self._find_joint_forward_op(block, flag) + op = block.ops[op_idx] + vars2 = op.desc.output_arg_names() + self.partB_to_partA_grad_name = list(set(vars1) - set(vars2)) + self.partB_to_partA_grad = [] + for var_name in self.partB_to_partA_grad_name: + self.partB_to_partA_grad.append(self.ori_main_block.var(var_name)) + + def _find_dense_grad_vars(self, bp_op_list): + program = self.ori_main_program + bp_op_input, bp_op_output = find_ops_list_input_output(program, + bp_op_list) + return (screen_persistables(program, bp_op_input) + screen_persistables( + program, bp_op_output)) + + def _get_partA_program(self, block): + # 1. create block 0 + # 1.1 insert send op + op_idx = self._find_joint_forward_op(block, + self.PART_A_JOINT_OP_DEVICE_FlAG) + op_list = [] + for i in range(len(block.ops)): + op = block.ops[i] + op_list.append(op) + if i == op_idx: + out_name = op.desc.output_arg_names()[0] + self.partA_to_partB_tensor_name = op.desc.output_arg_names() + self.partA_to_partB_tensor = self.ori_main_block.var(out_name) + break + first_block = self._get_block_by_idx(op_list, self.partA_program, 0) + self._insert_partA_communicate_op(first_block, op_idx + 1) + # logger.info('partA-first_block:{}'.format(first_block)) + + # 2. create block 1 + bp_op_list = get_bp_op_list(block) + push_sparse_op_list = get_distributed_push_sparse_op_list(block) + # logger.info('bp_op_list: {}'.format(bp_op_list)) + second_block = self._get_block_by_idx(bp_op_list + push_sparse_op_list, + self.partA_program, 1) + # 2.1. insert partA recv op + block_input_flag = "backward_joint_{}_{}@fl_ps".format(2, 1) + grad_to_block_id = block_input_flag + ":" + str(second_block.idx) + attrs = { + "message_to_block_id": [grad_to_block_id], + "optimize_blocks": [second_block], + "endpoint": get_trainer_endpoint(self.role_maker), ## + "fanin": 0, + "pserver_id": get_role_id(self.role_maker), + "distributed_mode": self.ps_mode, + "rpc_exec_thread_num": int(os.getenv("CPU_NUM", 32)), + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + } + second_block._insert_op( + index=0, + type='heter_listen_and_serv', + inputs={'X': []}, + outputs={}, + attrs=attrs) + # 2.2 insert push dense grad op + send_ops = find_send_op(self.ori_main_program) # push dense + delete_same_ops(block, send_ops) + dense_grad_vars = self._find_dense_grad_vars(bp_op_list) + add_send_op(self.ori_main_program, second_block, dense_grad_vars) + # logger.info('partA-second_block:{}'.format(second_block)) + + def _get_partB_program(self, block): + op_idx1 = self._find_joint_forward_op( + block, self.PART_B_JOINT_OP_DEVICE_FlAG) # elementwise_add op + op_idx2 = self._find_joint_backward_op(block, + self.PART_B_JOINT_OP_DEVICE_FlAG) + op_cnt = 0 + op_list1 = [] + op_list2 = [] + op_list3 = [] + for op in block.ops: + if op_cnt < op_idx1: + op_list1.append(op) + elif op_cnt <= op_idx2: + op_list2.append(op) + else: + op_list3.append(op) + op_cnt += 1 + + # 1. create block 0 + first_block = self._get_block_by_idx(op_list1, self.partB_program, 0) + + # 2. create block 1 + second_block = self._get_block_by_idx(op_list2, self.partB_program, 1) + # 2.1 insert send op + self._insert_partB_communicate_op(second_block, len(op_list2)) + # 2.2 insert remain ops + second_block = self._get_block_by_idx(op_list3, self.partB_program, 1) + # 2.3 insert push dense grad op + bp_op_list = get_bp_op_list(second_block) + dense_grad_vars = self._find_dense_grad_vars(bp_op_list) + add_send_op(self.ori_main_program, second_block, dense_grad_vars) + + # 3. insert partB recv op + block_input_flag = "forward_joint_{}_{}@fl_ps".format(1, 2) + grad_to_block_id = block_input_flag + ":" + str(second_block.idx) + attrs = { + "message_to_block_id": [grad_to_block_id], + "optimize_blocks": [second_block], ## what to do? + "endpoint": get_heter_worker_endpoint(self.role_maker), + "fanin": len(get_previous_stage_trainers(self.role_maker)), + "pserver_id": 1, # TODO + "distributed_mode": self.ps_mode, + "rpc_exec_thread_num": int(os.getenv("CPU_NUM", 32)), + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + } + first_block._insert_op( + index=len(op_list1), + type="heter_listen_and_serv", + inputs={'X': []}, + outputs={}, + attrs=attrs) + + #logger.info('partB-first_block:{}'.format(first_block)) + #logger.info('partB-second_block:{}'.format(second_block)) + + def _apply_single_impl(self, main_program, startup_program, pass_ctx): + attrs = pass_ctx._attrs + self.role_maker = attrs['role_maker'] + self.ps_mode = attrs['ps_mode'] + self.is_part_b = attrs['is_heter_worker'] # TODO + self.ori_main_program = main_program + self.ori_main_block = main_program.block(0) + + party_program_map = self._split_fl_program() + + prog_a = party_program_map['a'] + _main_file = ps_log_root_dir + '6_fl_A_main_program.prototxt' + debug_program(_main_file, prog_a) + self._get_partB_to_partA_grad(prog_a.global_block(), + self.PART_A_JOINT_OP_DEVICE_FlAG) + + prog_b = party_program_map['b'] + _main_file = ps_log_root_dir + '6_fl_B_main_program.prototxt' + debug_program(_main_file, prog_b) + + if not self.is_part_b: + self.partA_program = framework.Program() + self._get_partA_program(prog_a.global_block()) + pass_ctx._attrs['part_a_main_program'] = self.partA_program + self._clear_op_device_flag(self.partA_program) + check_program(self.partA_program) + else: + self.partB_program = framework.Program() + self._get_partB_program(prog_b.global_block()) + pass_ctx._attrs['part_b_main_program'] = self.partB_program + self._clear_op_device_flag(self.partB_program) + check_program(self.partB_program) diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index 888d517116a..2ba9b6b9c5a 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -732,6 +732,8 @@ class PsDescBuilder(object): self.is_heter_ps_mode = context['is_heter_ps_mode'] self.use_ps_gpu = context['use_ps_gpu'] self.barrier_table_id = None + print("is_heter_ps_mode in the_one_ps.py? {}".format( + self.is_heter_ps_mode)) self.send_ctx = get_the_one_send_context( self.context, use_origin_program=True, @@ -772,6 +774,7 @@ class PsDescBuilder(object): self.tensor_tables = self._get_tensor_tables() tables.extend(self.tensor_tables) tables.append(globals()['BarrierTable'](self.context, len(tables))) + print("test_fl_ps: tables len: {}".format(len(tables))) return tables def _get_service(self): @@ -864,7 +867,7 @@ class TheOnePSRuntime(RuntimeBase): scope = scopes[idx] table_id = ctx.table_id() var_names = recv_map[table_id] - # print("init params:", idx, table_id, var_names) + #print("init params:", idx, table_id, var_names) self._worker.push_dense_params(scope, table_id, var_names) def _pull_all_dense(self, scopes, send_ctx, recv_map): @@ -875,7 +878,7 @@ class TheOnePSRuntime(RuntimeBase): scope = scopes[idx] table_id = ctx.table_id() var_names = recv_map[table_id] - # print("pull all dense:", idx, table_id, var_names) + #print("pull all dense:", idx, table_id, var_names) self._worker.pull_dense_params(scope, table_id, var_names) def _init_params(self, program, scope, send_ctx, recv_map): @@ -902,7 +905,8 @@ class TheOnePSRuntime(RuntimeBase): def _init_worker(self, scopes=None): worker_desc = self.ps_desc_builder.build_worker_desc() - + #with open("test_fl_ps_worker_desc", "w") as f: + # f.write(worker_desc) if self.context['use_ps_gpu']: main_program = self.context['loss'].block.program if not main_program._fleet_opt: @@ -955,7 +959,8 @@ class TheOnePSRuntime(RuntimeBase): role_id = get_role_id(self.role_maker) self._worker.init_worker(proto_txt, self.string_hosts, role_id) - if self.context['ps_mode'] == DistributedMode.GEO: + if self.context[ + 'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode: self._communicator = Communicator( trainer_config.mode, kwargs, trainer_config.get_communicator_flags()) @@ -1010,19 +1015,27 @@ class TheOnePSRuntime(RuntimeBase): self.scopes = scopes if not is_test: - if self.context['ps_mode'] == DistributedMode.GEO: + if self.context[ + 'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True: self._communicator.init_params(init_params) else: if not self.context['use_ps_gpu']: if role_id == 0: + print("entering self._init_all_params()") self._init_all_params(scopes, send_ctx, dense_map) - fleet.util.barrier() + fleet.util.barrier() # 保证 0 号 worker 参数 push_dense_param over + if not self.context['use_ps_gpu']: - self._pull_all_dense(scopes, send_ctx, dense_map) + if self.is_heter_ps_mode == True and not self.role_maker._is_first_worker( + ): + self._communicator.pull_dense(init_params) + else: + self._pull_all_dense(scopes, send_ctx, dense_map) fleet.util.barrier() - if self.context['ps_mode'] == DistributedMode.GEO: + if self.context[ + 'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True: if not self._communicator.is_running(): self._communicator.start() else: @@ -1031,7 +1044,6 @@ class TheOnePSRuntime(RuntimeBase): launch_barrier = dist_strategy.a_sync_configs["launch_barrier"] launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1")) if launch_barrier and launch_barrier_flag: - # for trainer wait server ready wait_server_ready(self.role_maker._get_pserver_endpoints()) if self.is_heter_ps_mode and self.role_maker._get_next_trainers( ) != []: @@ -1043,12 +1055,14 @@ class TheOnePSRuntime(RuntimeBase): next_trainers = [] if self.role_maker._get_next_trainers() != []: next_trainers = self.role_maker._get_next_trainers() - self._heter_client = HeterClient(next_trainers, - previous_trainers, - self.role_maker._role_id()) + self._heter_client = HeterClient( + next_trainers, previous_trainers, + self.role_maker._role_id()) # --> HeterClient::GetInstance def _init_server(self, dirname=None, var_names=None, **kwargs): server_desc = self.ps_desc_builder.build_server_desc() + #with open("test_fl_ps_server_desc", "w") as f: + # f.write(server_desc) role_id = get_role_id(self.role_maker) trainers = get_trainers(self.role_maker) if self.is_heter_ps_mode: diff --git a/python/paddle/distributed/ps/utils/ps_factory.py b/python/paddle/distributed/ps/utils/ps_factory.py index 701ae8be6cb..bea102c837e 100755 --- a/python/paddle/distributed/ps/utils/ps_factory.py +++ b/python/paddle/distributed/ps/utils/ps_factory.py @@ -33,10 +33,9 @@ class PsProgramBuilderFactory(object): return globals()['GeoPsProgramBuilder'](pass_ctx) elif attrs['use_ps_gpu']: return globals()['GpuPsProgramBuilder'](pass_ctx) - elif attrs['is_heter_ps_mode']: + elif attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']: return globals()['HeterAsyncPsProgramBuilder'](pass_ctx) - elif 'is_fl_ps_mode' in attrs and attrs[ - 'is_fl_ps_mode'] == DistributedMode.FL: + elif 'is_fl_ps_mode' in attrs and attrs['is_fl_ps_mode']: return globals()['FlPsProgramBuilder'](pass_ctx) elif attrs['ps_mode'] == DistributedMode.SYNC: return globals()['CpuSyncPsProgramBuilder'](pass_ctx) diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index f1d6a1f04a3..9e063716758 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -23,6 +23,9 @@ class PsProgramBuilder(object): self.pass_ctx = pass_ctx self.attrs = self.pass_ctx._attrs self.loss = self.attrs['loss'] + self.origin_startup_program = self.attrs['origin_startup_program'] + self.main_program = self.attrs['origin_main_programs'] + self.cloned_main = self.attrs['cloned_main'] self.cloned_startup = self.attrs['cloned_startup'] @@ -30,6 +33,7 @@ class PsProgramBuilder(object): self.use_heter_ps = self.attrs['is_heter_ps_mode'] self.is_worker = self.attrs['is_worker'] self.is_heter_worker = self.attrs['is_heter_worker'] + self.is_server = self.attrs['is_server'] self.ps_mode = self.attrs['ps_mode'] self.launch_barrier = self.attrs['launch_barrier'] @@ -67,9 +71,10 @@ class PsProgramBuilder(object): def _build_programs(self): if self.attrs['is_worker']: - logger.info("start building trainer program") self._build_trainer_programs() fluid.framework.switch_startup_program(self.cloned_startup) + print("fluid.default_startup_program: {}".format( + fluid.default_startup_program)) # print("ps_program_build before =", id(self.loss.block.program)) self._build_trainer_desc() self.loss.block.program = self.cloned_main @@ -81,7 +86,6 @@ class PsProgramBuilder(object): # self.loss.block.program._fleet_opt) elif self.attrs['is_server']: - logger.info("start building pserver program") self._build_pserver_programs() self.loss.block.program = self.attrs['_main_server'] fluid.framework.switch_startup_program(self.attrs[ @@ -90,7 +94,6 @@ class PsProgramBuilder(object): class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 def __init__(self, pass_ctx): - logger.info("start building geo-ps program") super(GeoPsProgramBuilder, self).__init__(pass_ctx) if self.ps_mode != DistributedMode.GEO: raise ValueError("ps mode: {} not matched {}", @@ -105,8 +108,6 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 if self.launch_barrier and self.launch_barrier_flag: wait_server_ready(self.server_endpoints) - return - def _build_pserver_programs(self): add_listen_and_serv_pass = new_pass('add_listen_and_serv_pass', self.attrs) @@ -118,8 +119,6 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式 class CpuSyncPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx) - if self.ps_mode == DistributedMode.SYNC: - logger.info("start building cpu-sync-ps program") if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC: raise ValueError("ps mode: {} not matched {}", format(self.ps_mode, "PsProgramBuilder")) @@ -161,7 +160,6 @@ class CpuSyncPsProgramBuilder(PsProgramBuilder): class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder): def __init__(self, pass_ctx): - logger.info("start building cpu-async-ps program") super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx) def _build_trainer_desc(self): @@ -198,7 +196,6 @@ class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder): class GpuPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): - logger.info("start building gpu-ps program") super(GpuPsProgramBuilder, self).__init__(pass_ctx) def _build_trainer_programs(self): @@ -231,12 +228,7 @@ class GpuPsProgramBuilder(PsProgramBuilder): class HeterAsyncPsProgramBuilder(PsProgramBuilder): def __init__(self, pass_ctx): - logger.info("start building heter-async-ps program") super(HeterAsyncPsProgramBuilder, self).__init__(pass_ctx) - if self.use_ps_gpu or self.ps_mode == DistributedMode.GEO or self.attrs[ - 'is_heter_ps_mode'] == False: - raise ValueError("ps mode: {} not matched {}", - format(self.ps_mode, "HeterAsyncPsProgramBuilder")) def _build_trainer_programs(self): add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass", @@ -296,15 +288,91 @@ class HeterAsyncPsProgramBuilder(PsProgramBuilder): '_startup_server']) -class FlPsProgramBuilder(PsProgramBuilder): +class FlPsProgramBuilder(HeterAsyncPsProgramBuilder): def __init__(self, pass_ctx): super(FlPsProgramBuilder, self).__init__(pass_ctx) def _build_trainer_programs(self): - pass + _main_file = ps_log_root_dir + '0_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs) + distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + _main_file = ps_log_root_dir + '1_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs) + delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx) + + _main_file = ps_log_root_dir + '2_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs) + append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + _main_file = ps_log_root_dir + '3_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass", + self.attrs) + delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']], + [self.cloned_startup], self.pass_ctx) + + _main_file = ps_log_root_dir + '4_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs) + fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx) + + _main_file = ps_log_root_dir + '5_fl_worker_main_program.prototxt' + #debug_program(_main_file, self.cloned_main) + + split_trainer_ops_pass = new_pass("split_fl_ops_pass", self.attrs) + split_trainer_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) + + if not self.is_heter_worker: + self.part_a_program = self.pass_ctx._attrs['part_a_main_program'] + self.cloned_main = self.part_a_program + _main_file = ps_log_root_dir + '8_fl_A_main_program.prototxt' + debug_program(_main_file, self.cloned_main) + else: + self.part_b_program = self.pass_ctx._attrs['part_b_main_program'] + self.cloned_main = self.part_b_program + _main_file = ps_log_root_dir + '8_fl_B_main_program.prototxt' + debug_program(_main_file, self.cloned_main) + + set_heter_pipeline_opt_pass = new_pass('set_heter_pipeline_opt_pass', + self.attrs) + set_heter_pipeline_opt_pass.apply([self.cloned_main], + [self.cloned_startup], self.pass_ctx) + + self.attrs['origin_startup_program'] = self.cloned_startup + self.attrs['origin_main_program'] = self.cloned_main + + if not self.is_heter_worker: + _main_file = ps_log_root_dir + 'final_fl_A_main_program.prototxt' + debug_program(_main_file, self.attrs['origin_main_program'] + ._heter_pipeline_opt['section_program']) + else: + _main_file = ps_log_root_dir + 'final_fl_B_main_program.prototxt' + debug_program(_main_file, self.attrs['origin_main_program'] + ._heter_pipeline_opt['section_program']) + + return def _build_pserver_programs(self): - pass + self.loss.block.program = self.attrs['_main_server'] def _build_programs(self): - pass + if not self.is_server: + self._build_trainer_programs() + fluid.framework.switch_startup_program(self.cloned_startup) + fluid.framework.switch_main_program(self.cloned_main) + print("fluid.default_startup_program: {}".format( + fluid.default_startup_program()._heter_pipeline_opt)) + else: + self._build_pserver_programs() + fluid.framework.switch_startup_program(self.attrs[ + '_startup_server']) + fluid.framework.switch_main_program(self.attrs['_main_server']) diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index 7acfd6cfe19..223c4e69d9d 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -37,10 +37,12 @@ LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@" OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName() RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC +op_role = core.op_proto_and_checker_maker.OpRole op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize backward = core.op_proto_and_checker_maker.OpRole.Backward +OP_DEVICE_KEY = core.op_proto_and_checker_maker.kOpDeviceAttrName() DEVICE_LIST = ["cpu", "gpu", "xpu"] COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] @@ -91,8 +93,7 @@ class TrainerRuntimeConfig(object): num_threads = os.getenv("CPU_NUM", "1") send_queue_size = num_threads k_steps = valid_strategy.a_sync_configs["k_steps"] - logger.info("ps mode in strategy: {}, {}".format( - valid_strategy.a_sync, valid_strategy.a_sync_configs["k_steps"])) + if not valid_strategy.a_sync and k_steps == 0: self.mode = DistributedMode.SYNC @@ -238,17 +239,11 @@ def get_ps_endpoints(role_maker): def get_heter_worker_endpoint(role_maker): - try: - return role_maker._get_heter_worker_endpoint() - except Exception: - return role_maker.get_heter_worker_endpoint() + return role_maker._get_heter_worker_endpoint() def get_trainer_endpoint(role_maker): - try: - return role_maker._get_trainer_endpoint() - except Exception: - return role_maker.get_trainer_endpoint() + return role_maker._get_trainer_endpoint() def get_previous_stage_trainers(role_maker): @@ -339,8 +334,8 @@ def get_dense_send_context(program, var_numel += reduce(lambda x, y: x * y, var.shape) grad_name = "Dense@GRAD_" + str(idx) aggregate = True - print("public get_dense_send_context dense_table:", grad_name, - var_numel, origin_varnames) + # print("public get_dense_send_context dense_table:", grad_name, + # var_numel, origin_varnames) from paddle.fluid.core import CommContext dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, @@ -362,8 +357,8 @@ def get_dense_send_context(program, var_numel += reduce(lambda x, y: x * y, var.shape) grad_name = "DataNorm@GRAD_" + str(idx) aggregate = True - print("public get_dense_send_context data_norm table:", grad_name, - var_numel, origin_varnames) + # print("public get_dense_send_context data_norm table:", grad_name, + # var_numel, origin_varnames) from paddle.fluid.core import CommContext data_norm_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, @@ -441,14 +436,15 @@ def _step_ctx(idx, role_maker): def get_the_one_send_context(context, - split_dense_table=False, use_origin_program=False, + split_dense_table=False, ep_list=None): if ep_list is None: ep_list = ["127.0.0.1:6071"] send_ctx = {} trainer_id = get_role_id(context['role_maker']) origin_programs = context['origin_main_programs'] + print("is_heter_ps_mode? {}".format(split_dense_table)) idx = 0 distibuted_varnames = get_sparse_tablenames(origin_programs, True) @@ -471,8 +467,8 @@ def get_the_one_send_context(context, shape = list(var.shape) shape[0] = 0 if is_distributed else shape[0] - # print("public get_the_one_send_context sparse:", grad_name, - # splited_varname, shape) + #print("public get_the_one_send_context sparse:", grad_name, + # splited_varname, shape) if grad_name in send_ctx: continue from paddle.fluid.core import CommContext @@ -1094,14 +1090,13 @@ def block_append_op(program, origin_program, block, op): else: # for grad op op_desc = op.desc - op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() backward = core.op_proto_and_checker_maker.OpRole.Backward device_attr_name = core.op_proto_and_checker_maker.kOpDeviceAttrName() # append grad op new_op_desc = block.desc.append_op() new_op_desc.copy_from(op_desc) - new_op_desc._set_attr(op_role_attr_name, backward) + new_op_desc._set_attr(RPC_OP_ROLE_ATTR_NAME, backward) # set device gard if op.desc.has_attr(device_attr_name): @@ -1422,7 +1417,7 @@ def find_op_input_output(program, block, op): return input_var_list, output_var_list -def add_heter_send_op(program, heter_program, block, block_var_detail): +def add_send_op(program, block, _vars): def _get_send_op_dict(): send_op_dict = {} send_op_list = find_send_op(program) @@ -1436,7 +1431,7 @@ def add_heter_send_op(program, heter_program, block, block_var_detail): send_grad_var_list = [] send_op_dict = _get_send_op_dict() table_dict = {} - for persistable_var in block_var_detail["backward"]["persistables"]: + for persistable_var in _vars: if "@GRAD" not in persistable_var: continue if "GRAD" != persistable_var.split("@")[-1]: @@ -1482,6 +1477,7 @@ def get_vars_name_in_block(block): return vars_name_list +# reserve static_var def delete_trainer_useless_var(program, static_var): static_var = list(set(static_var)) program_useful_var_list = [] @@ -1525,6 +1521,67 @@ def create_backward_block(program, origin_program, bp_ops_list, return heter_block +def is_backward_op(op): + return op_role_attr_name in op.attr_names and ( + int(op.attr(op_role_attr_name)) & int(op_role.Backward)) + + +def is_forward_op(op): + return op_role_attr_name in op.attr_names and ( + int(op.attr(op_role_attr_name)) == int(op_role.Forward)) + + +def is_push_sparse_op(op): + return op.type == 'distributed_push_sparse' + + +def get_distributed_push_sparse_op_list(block): + push_sparse_op_list = [] + for op_idx in range(block.desc.op_size()): + op = block.ops[op_idx] + if is_push_sparse_op(op): + push_sparse_op_list.append(op) + return push_sparse_op_list + + +def get_bp_op_list(block): + bp_op_list = [] + for op_idx in range(block.desc.op_size()): + op = block.ops[op_idx] + if is_backward_op(op): + bp_op_list.append(op) + return bp_op_list + + +def delete_same_ops(block, ops): + for op in ops: + try: + for origin_op in block.ops: + if str(origin_op) == str(op): + idx = list(block.ops).index(origin_op) + block._remove_op(idx) + break + except Exception as e: + print(e) + + +def check_program(program): + block_idx = 0 + for block in program.blocks: + for op in block.ops: + input_var_names = op.desc.input_arg_names() + output_var_names = op.desc.output_arg_names() + for var_name in (input_var_names + output_var_names): + if not block._find_var_recursive(str(var_name)): + raise ValueError( + 'var: {} needed by op is not found in block: {}'.format( + str(var_name), block_idx)) + block_idx += 1 + print('program checked valid') + + def debug_program(file, program): + # py >= 3.2 + os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w+') as f: f.write(str(program)) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py old mode 100644 new mode 100755 index 164545d0a05..862b18dc5a2 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1326,6 +1326,8 @@ class Executor(object): use_program_cache=use_program_cache) if isinstance(program, Program) and program._heter_pipeline_opt: + #print("program._heter_pipeline_opt: {}".format( + # program._heter_pipeline_opt)) ## change default executor heter_place = program._heter_pipeline_opt["heter_place"] heter_place = framework._get_paddle_place(heter_place) @@ -1334,6 +1336,7 @@ class Executor(object): self._default_executor = core.Executor(p) # TODO(zhangminxu): support heterps pipeline training using exe.run if "startup_program" in program._heter_pipeline_opt: + #print("get startup_program from _pipeline_opt") program = program._heter_pipeline_opt["startup_program"] if isinstance(program, Program) and \ @@ -1391,6 +1394,7 @@ class Executor(object): return False compiled = isinstance(program, compiler.CompiledProgram) + # print("compiled is : {}".format(compiled)) # NOTE(zhiqiu): do not support compiled program now if compiled: return False @@ -1778,24 +1782,26 @@ class Executor(object): dataset.set_use_var(data_vars) elif program._heter_pipeline_opt is not None: stage_id = program._heter_pipeline_opt["pipeline_stage"] + #print("test_fl_stage_id: {}".format(stage_id)) heter_place = program._heter_pipeline_opt["heter_place"] if stage_id != 0: - import paddle - if dataset is not None: - raise RuntimeError( - "dataset should be None for heter pipeline mode") - # The following fake dataset is created to call - # the _prepare_trainer api, and it is meaningless. - data_vars = [] - for var in program.global_block().vars.values(): - if var.is_data: - data_vars.append(var) - dataset = paddle.fluid.DatasetFactory().create_dataset( - 'InMemoryDataset') - dataset.set_batch_size(1) - dataset.set_thread(1) - dataset.set_filelist(['None']) - dataset.set_use_var(data_vars) + if "is_fl_mode" not in program._heter_pipeline_opt: + import paddle + if dataset is not None: + raise RuntimeError( + "dataset should be None for heter pipeline mode") + # The following fake dataset is created to call + # the _prepare_trainer api, and it is meaningless. + data_vars = [] + for var in program.global_block().vars.values(): + if var.is_data: + data_vars.append(var) + dataset = paddle.fluid.DatasetFactory().create_dataset( + 'InMemoryDataset') + dataset.set_batch_size(1) + dataset.set_thread(1) + dataset.set_filelist(['None']) + dataset.set_use_var(data_vars) else: if dataset is None: raise RuntimeError( @@ -1855,10 +1861,11 @@ class Executor(object): # warning if dataset not set psgpu in psgpu mode if dataset.use_ps_gpu is False and trainer.proto_desc.use_ps_gpu: logging.warning("dataset should call set_use_ps_gpu in PsGpu mode") + dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) if program._heter_pipeline_opt is None: - trainer_instance = self._default_executor.init_for_dataset( + trainer_instance = self._default_executor.init_for_dataset( # -->InitForDataset program.desc, trainer._desc(), scope, dataset.dataset) else: # cache trainer instance for heterps pipeline training @@ -1869,6 +1876,7 @@ class Executor(object): if trainer_instance is None: trainer_instance = self._default_executor.init_for_dataset( program.desc, trainer._desc(), scope, dataset.dataset) + #print("test_fl_ps - trainer_desc: {}\n".format(trainer)) self._add_trainer_cache(cache_key, trainer_instance) else: trainer_instance.ResetDataset(dataset.dataset) @@ -2341,20 +2349,6 @@ class Executor(object): fetch_info=None, print_period=100, fetch_handler=None): - return self._start_heter_trainer(program, scope, False, debug, - fetch_list, fetch_info, print_period, - fetch_handler) - - def _start_heter_trainer(self, - program=None, - scope=None, - is_infer=False, - debug=False, - fetch_list=None, - fetch_info=None, - print_period=100, - fetch_handler=None): - scope, trainer = self._prepare_trainer( program=program, dataset=None, @@ -2365,7 +2359,7 @@ class Executor(object): fetch_info=fetch_info, print_period=print_period) - trainer._set_infer(is_infer) + trainer._set_infer(False) trainer._gen_trainer_desc() self._dump_debug_info(program=program, trainer=trainer) diff --git a/python/paddle/fluid/tests/custom_op/ps_usr_print_log b/python/paddle/fluid/tests/custom_op/ps_usr_print_log deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py b/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py new file mode 100755 index 00000000000..9aa7452423f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/dataset_generator_A.py @@ -0,0 +1,49 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid.incubate.data_generator as dg + +cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +hash_dim_ = 1000001 +continuous_range_ = range(1, 14) +categorical_range_ = range(14, 40) + + +class CriteoDataset(dg.MultiSlotDataGenerator): + def generate_sample(self, line): + """ + Read the data line by line and process it as a dictionary + """ + + def reader(): + """ + This function needs to be implemented by the user, based on data format + """ + features = line.rstrip('\n').split('\t') + feature_name = [] + sparse_feature = [] + for idx in categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % hash_dim_]) + for idx in categorical_range_: + feature_name.append("C" + str(idx - 13)) + yield list(zip(feature_name, sparse_feature)) + + return reader + + +d = CriteoDataset() +d.run_from_stdin() diff --git a/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py b/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py new file mode 100755 index 00000000000..d76897a240c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/dataset_generator_B.py @@ -0,0 +1,53 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid.incubate.data_generator as dg + +cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +hash_dim_ = 1000001 +continuous_range_ = range(1, 14) +categorical_range_ = range(14, 40) + + +class CriteoDataset(dg.MultiSlotDataGenerator): + def generate_sample(self, line): + """ + Read the data line by line and process it as a dictionary + """ + + def reader(): + """ + This function needs to be implemented by the user, based on data format + """ + features = line.rstrip('\n').split('\t') + dense_feature = [] + for idx in continuous_range_: + if features[idx] == "": + dense_feature.append(0.0) + else: + dense_feature.append( + (float(features[idx]) - cont_min_[idx - 1]) / + cont_diff_[idx - 1]) + label = [int(features[0])] + feature_name = ["dense_feature"] + feature_name.append("label") + yield list(zip(feature_name, [label] + [dense_feature])) + + return reader + + +d = CriteoDataset() +d.run_from_stdin() diff --git a/python/paddle/fluid/tests/unittests/ps/download_data.sh b/python/paddle/fluid/tests/unittests/ps/download_data.sh new file mode 100755 index 00000000000..498d9df9c2b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/download_data.sh @@ -0,0 +1,27 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz +tar -zxvf ctr_data.tar.gz +mv ./raw_data ./train_data_full +mkdir train_data && cd train_data +cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd .. +mv ./test_data ./test_data_full +mkdir test_data && cd test_data +cp ../test_data_full/part-220 ./ && cd .. +echo "Complete data download." +echo "Full Train data stored in ./train_data_full " +echo "Full Test data stored in ./test_data_full " +echo "Rapid Verification train data stored in ./train_data " +echo "Rapid Verification test data stored in ./test_data " diff --git a/python/paddle/fluid/tests/unittests/ps/fl_async_ps_config.yaml b/python/paddle/fluid/tests/unittests/ps/fl_async_ps_config.yaml new file mode 100755 index 00000000000..3e02046f71c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/fl_async_ps_config.yaml @@ -0,0 +1,39 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# refer to PaddleRec/models/rank/dnn/benchmark.yaml + +hyper_parameters: + optimizer: + class: Adam + learning_rate: 0.0001 + adam_lazy_mode: True + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 10 + dense_input_dim: 13 + fc_sizes: [] + +runner: + sync_mode: "async" # sync / async / geo / heter + is_fl_ps_mode: 1 + reader_thread_num: 16 + use_gpu: 0 + batch_size: 2 + train_files_path: "./train_data" + epoch_num: 4 + + model_path: "../ps_dnn_model.py" + + diff --git a/python/paddle/fluid/tests/unittests/ps/fl_ps_trainer.py b/python/paddle/fluid/tests/unittests/ps/fl_ps_trainer.py new file mode 100755 index 00000000000..6e9eefe879d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/fl_ps_trainer.py @@ -0,0 +1,145 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import os +import unittest +import numpy as np +import time +import paddle +from paddle.distributed.ps.utils.public import ps_log_root_dir, debug_program +import paddle.distributed.fleet as fleet +import paddle.fluid as fluid + + +def get_dataset(inputs, config, pipe_cmd, role="worker"): + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var(inputs) + dataset.set_pipe_command(pipe_cmd) + dataset.set_batch_size(config.get('runner.batch_size')) + reader_thread_num = int(config.get('runner.reader_thread_num')) + dataset.set_thread(reader_thread_num) + train_files_path = config.get('runner.train_files_path') + print('train_data_files:{}'.format(train_files_path)) + file_list = [ + os.path.join(train_files_path, x) for x in os.listdir(train_files_path) + ] + if role == "worker": + file_list = fleet.util.get_file_shard(file_list) + print("worker file list: {}".format(file_list)) + elif role == "heter_worker": + file_list = fleet.util.get_heter_file_shard(file_list) + print("heter worker file list: {}".format(file_list)) + + return dataset, file_list + + +def fl_ps_train(): + # 0. get role + import paddle.distributed.fleet.base.role_maker as role_maker + role_maker = role_maker.PaddleCloudRoleMaker() + role_maker._generate_role() + fleet.util._set_role_maker(role_maker) + + # 1. load yaml-config to dict-config + from ps_dnn_trainer import YamlHelper, StaticModel, get_user_defined_strategy + yaml_helper = YamlHelper() + config_yaml_path = '../ps/fl_async_ps_config.yaml' + config = yaml_helper.load_yaml(config_yaml_path) + #yaml_helper.print_yaml(config) + + # 2. get static model + paddle.enable_static() + model = StaticModel(config) + feeds_list = model.create_feeds() + metrics = model.fl_net(feeds_list) + loss = model._cost + + # 3. compile time - build program_desc + user_defined_strategy = get_user_defined_strategy(config) + a_sync_configs = user_defined_strategy.a_sync_configs + a_sync_configs["launch_barrier"] = True + user_defined_strategy.a_sync_configs = a_sync_configs + print("launch_barrier: ", + user_defined_strategy.a_sync_configs["launch_barrier"]) + learning_rate = config.get("hyper_parameters.optimizer.learning_rate") + inner_optimizer = paddle.optimizer.Adam(learning_rate, lazy_mode=True) + from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer + ps_optimizer = ParameterServerOptimizer(inner_optimizer) + ps_optimizer._set_basic_info(loss, role_maker, inner_optimizer, + user_defined_strategy) + ps_optimizer.minimize_impl(loss) + + # 4. runtime + from paddle.distributed.ps.the_one_ps import TheOnePSRuntime + _runtime_handle = TheOnePSRuntime() # ps 目录下重构版的 TheOnePSRuntime + _runtime_handle._set_basic_info(ps_optimizer.pass_ctx._attrs) + epoch_num = int(config.get('runner.epoch_num')) + # 4.1 run server - build fleet_desc + if role_maker._is_server(): + _runtime_handle._init_server() + _runtime_handle._run_server() + # 4.2 run worker + elif role_maker._is_worker(): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + _runtime_handle._init_worker() + print('trainer get dataset') + inputs = feeds_list[1:-1] + dataset, file_list = get_dataset(inputs, config, + "python dataset_generator_A.py") + print("fluid.default_main_program: {}".format( + fluid.default_main_program()._heter_pipeline_opt)) + for epoch in range(epoch_num): + # A 方和 B 方如果要以文件粒度 shuffle 时,则需要固定同一个种子 + dataset.set_filelist(file_list) + start_time = time.time() + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + print_period=2, + debug=False) + end_time = time.time() + print("trainer epoch %d finished, use time=%d\n" % ( + (epoch), end_time - start_time)) + exe.close() + _runtime_handle._stop_worker() + print("Fl partyA Trainer Success!") + else: + exe = fluid.Executor() + exe.run(fluid.default_startup_program()) + _runtime_handle._init_worker() + inputs = [feeds_list[0], + feeds_list[-1]] # 顺序务必要和 dataset_generator_B.py 中保持一致 + dataset, file_list = get_dataset( + inputs, config, "python dataset_generator_B.py", "heter_worker") + print("fluid.default_main_program: {}".format( + fluid.default_main_program()._heter_pipeline_opt)) + for epoch in range(epoch_num): + dataset.set_filelist(file_list) + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + print_period=2, + debug=False) + exe.close() + _runtime_handle._stop_worker() + print("Fl partB Trainer Success!") + + +if __name__ == '__main__': + fl_ps_train() diff --git a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py index 0fd64b0d923..a2ec563efd8 100755 --- a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py +++ b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py @@ -35,7 +35,7 @@ sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) def is_distributed_env(): node_role = os.getenv("TRAINING_ROLE") - logger.info("-- Role: {} --".format(node_role)) + print("-- Role: {} --".format(node_role)) if node_role is None: return False else: @@ -167,6 +167,14 @@ def get_user_defined_strategy(config): elif sync_mode == "async": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True + strategy.is_fl_ps_mode = True if config.get( + "runner.is_fl_ps_mode") == 1 else False + if strategy.is_fl_ps_mode == True: + strategy.pipeline = False + micro_num = 1 + strategy.pipeline_configs = { + "accumulate_steps": micro_num + } ## num_microbatches elif sync_mode == "geo": strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True @@ -215,13 +223,14 @@ def get_user_defined_strategy(config): print("strategy table config:", strategy.sparse_table_configs) a_sync_configs = strategy.a_sync_configs a_sync_configs["launch_barrier"] = False + # a_sync_configs["launch_barrier"] = True strategy.a_sync_configs = a_sync_configs print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"]) return strategy -def get_distributed_strategy(user_defined_strategy): +def get_distributed_strategy(user_defined_strategy): # pslib from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory k_steps = user_defined_strategy.a_sync_configs["k_steps"] @@ -318,14 +327,14 @@ class DnnTrainer(object): fleet.init() if fleet.is_server(): - logger.info("server: {} started".format(fleet.server_index())) + print("server: {} started".format(fleet.server_index())) else: - logger.info("worker: {} started".format(fleet.worker_index())) + print("worker: {} started".format(fleet.worker_index())) def run_minimize(self): self.init_fleet_with_gloo() self.model = get_model(self.config) - logger.info("cpu_num: {}".format(os.getenv("CPU_NUM"))) + print("cpu_num: {}".format(os.getenv("CPU_NUM"))) self.input_data = self.model.create_feeds() self.metrics = self.model.net(self.input_data) loss = self.model._cost @@ -337,14 +346,14 @@ class DnnTrainer(object): self.role_maker._generate_role() # 必要 if self.config['debug_new_minimize'] == 1: - logger.info("entering run_minimize -- new") + print("entering run_minimize -- new") from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(inner_optimizer) ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer, user_defined_strategy) ps_optimizer.minimize_impl(loss) else: - logger.info("entering run_minimize -- old") + print("entering run_minimize -- old") fleet_obj = fleet.distributed_optimizer( inner_optimizer, user_defined_strategy) ## Fleet 对象 fleet_obj.minimize(loss) @@ -376,7 +385,7 @@ class DnnTrainer(object): startup_program = paddle.static.default_startup_program() inner_optimizer.minimize(loss, startup_program) if self.config['debug_new_pass'] == 1: - logger.info("entering run {} - new".format( + print("entering run {} - new".format( str(config["applied_pass_name"]))) from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(inner_optimizer) @@ -390,7 +399,7 @@ class DnnTrainer(object): ps_optimizer.pass_ctx._attrs) append_send_ops_pass.apply([_main], [None], ps_optimizer.pass_ctx) else: - logger.info("entering run {} - old".format( + print("entering run {} - old".format( str(config["applied_pass_name"]))) from paddle.fluid.incubate.fleet.parameter_server.ir import public as public dist_strategy = get_distributed_strategy(user_defined_strategy) @@ -428,7 +437,7 @@ class DnnTrainer(object): self.role_maker._generate_role() # 必要 if self.config['debug_the_one_ps'] == 1: - logger.info("entering run_the_one_ps -- new") + print("entering run_the_one_ps -- new") from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(inner_optimizer) @@ -455,7 +464,7 @@ class DnnTrainer(object): else: pass ''' - logger.info("entering run_the_one_ps -- old") + print("entering run_the_one_ps -- old") fleet_obj = fleet.distributed_optimizer( inner_optimizer, user_defined_strategy) fleet_obj.minimize(loss) @@ -486,7 +495,7 @@ class DnnTrainer(object): if __name__ == "__main__": paddle.enable_static() config = parse_args() - logger.info(">>>>>>>>>> python process started") + print(">>>>>>>>>> python process started") os.environ["CPU_NUM"] = str(config.get("runner.thread_num")) benchmark_main = DnnTrainer(config) if config['run_single_pass'] == 1: diff --git a/python/paddle/fluid/tests/unittests/ps/test_fl_ps.py b/python/paddle/fluid/tests/unittests/ps/test_fl_ps.py new file mode 100755 index 00000000000..2dc5b919d0d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/test_fl_ps.py @@ -0,0 +1,51 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import shlex +from paddle.fluid.tests.unittests.distributed_passes.dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists +import os + + +class FlPsTest(unittest.TestCase): + def test_launch_fl_ps(self): + pass + ''' + cmd = [ + 'python', '-m', 'paddle.distributed.fleet.launch', '--log_dir', + '/ps_log/fl_ps', '--servers', "127.0.0.1:8070", '--workers', + "127.0.0.1:8080,127.0.0.1:8081", '--heter_workers', + "127.0.0.1:8090,127.0.0.1:8091", '--heter_devices', "cpu", + '--worker_num', "2", '--heter_worker_num', "2", 'fl_ps_trainer.py' + ] + cmd = [shlex.quote(c) for c in cmd] + prepare_python_path_and_return_module(__file__) + exitcode = os.system(' '.join(cmd)) + ''' + + +if __name__ == '__main__': + remove_path_if_exists('/ps_log') + remove_path_if_exists('/ps_usr_print_log') + if not os.path.exists('./train_data'): + os.system('sh download_data.sh') + os.system('rm -rf ctr_data.tar.gz') + os.sysyem('rm -rf train_data_full') + os.sysyem('rm -rf test_data_full') + unittest.main() + if os.path.exists('./train_data'): + os.system('rm -rf train_data') + os.system('rm -rf test_data') diff --git a/python/paddle/fluid/tests/unittests/ps_dnn_model.py b/python/paddle/fluid/tests/unittests/ps_dnn_model.py index 8d91e0f4678..f41f03297c9 100755 --- a/python/paddle/fluid/tests/unittests/ps_dnn_model.py +++ b/python/paddle/fluid/tests/unittests/ps_dnn_model.py @@ -17,7 +17,6 @@ import paddle.nn as nn import paddle.nn.functional as F import math import paddle.distributed.fleet as fleet -from paddle.distributed.ps.utils.public import logger class DNNLayer(nn.Layer): @@ -90,6 +89,154 @@ class DNNLayer(nn.Layer): return y_dnn +class FlDNNLayer(nn.Layer): + def __init__(self, + sparse_feature_number, + sparse_feature_dim, + dense_feature_dim, + sparse_number, + sync_mode=None): + super(FlDNNLayer, self).__init__() + + self.PART_A_DEVICE_FlAG = 'gpu:0' + self.PART_A_JOINT_OP_DEVICE_FlAG = 'gpu:2' + self.PART_B_DEVICE_FlAG = 'gpu:1' + self.PART_B_JOINT_OP_DEVICE_FlAG = 'gpu:3' + + self.sync_mode = sync_mode + self.sparse_feature_number = sparse_feature_number + self.sparse_feature_dim = sparse_feature_dim + self.slot_num = sparse_number + self.dense_feature_dim = dense_feature_dim + + layer_sizes_a = [self.slot_num * self.sparse_feature_dim, 5, + 7] # for test + layer_sizes_b = [self.dense_feature_dim, 6, 7] + layer_sizes_top = [7, 2] + + self.embedding = paddle.nn.Embedding( + self.sparse_feature_number, + self.sparse_feature_dim, + sparse=True, + weight_attr=paddle.ParamAttr( + name="SparseFeatFactors", + initializer=paddle.nn.initializer.Uniform())) + + # part_a fc + acts = ["relu" for _ in range(len(layer_sizes_a))] + self._mlp_layers_a = [] + for i in range(len(layer_sizes_a) - 1): + linear = paddle.nn.Linear( + in_features=layer_sizes_a[i], + out_features=layer_sizes_a[i + 1], + weight_attr=paddle.ParamAttr( + initializer=paddle.nn.initializer.Normal( + std=1.0 / math.sqrt(layer_sizes_a[i])))) + self.add_sublayer('linear_%d' % i, linear) + self._mlp_layers_a.append(linear) + act = paddle.nn.ReLU() + self.add_sublayer('act_%d' % i, act) + self._mlp_layers_a.append(act) + + # part_b fc + acts = ["relu" for _ in range(len(layer_sizes_b))] + self._mlp_layers_b = [] + for i in range(len(layer_sizes_b) - 1): + linear = paddle.nn.Linear( + in_features=layer_sizes_b[i], + out_features=layer_sizes_b[i + 1], + weight_attr=paddle.ParamAttr( + initializer=paddle.nn.initializer.Normal( + std=1.0 / math.sqrt(layer_sizes_b[i])))) + self.add_sublayer('linear_%d' % i, linear) + self._mlp_layers_b.append(linear) + act = paddle.nn.ReLU() + self.add_sublayer('act_%d' % i, act) + self._mlp_layers_b.append(act) + + # top fc + acts = ["relu" for _ in range(len(layer_sizes_top))] + self._mlp_layers_top = [] + for i in range(len(layer_sizes_top) - 1): + linear = paddle.nn.Linear( + in_features=layer_sizes_top[i], + out_features=layer_sizes_top[i + 1], + weight_attr=paddle.ParamAttr( + initializer=paddle.nn.initializer.Normal( + std=1.0 / math.sqrt(layer_sizes_top[i])))) + self.add_sublayer('linear_%d' % i, linear) + self._mlp_layers_top.append(linear) + act = paddle.nn.ReLU() + self.add_sublayer('act_%d' % i, act) + self._mlp_layers_top.append(act) + + def bottom_a_layer(self, sparse_inputs): + with paddle.fluid.device_guard(self.PART_A_DEVICE_FlAG): + sparse_embs = [] + for s_input in sparse_inputs: + emb = self.embedding(s_input) + emb = paddle.reshape(emb, shape=[-1, self.sparse_feature_dim]) + sparse_embs.append(emb) + + y = paddle.concat(x=sparse_embs, axis=1) + y = self._mlp_layers_a[0](y) + y = self._mlp_layers_a[1](y) + + y = self._mlp_layers_a[2](y) + with paddle.fluid.device_guard( + self.PART_A_JOINT_OP_DEVICE_FlAG): # joint point + bottom_a = self._mlp_layers_a[3](y) + + return bottom_a + + def bottom_b_layer(self, dense_inputs): + with paddle.fluid.device_guard(self.PART_B_DEVICE_FlAG): + y = self._mlp_layers_b[0](dense_inputs) + y = self._mlp_layers_b[1](y) + + y = self._mlp_layers_b[2](y) + bottom_b = self._mlp_layers_b[3](y) + + return bottom_b + + def interactive_layer(self, bottom_a, bottom_b): + with paddle.fluid.device_guard( + self.PART_B_JOINT_OP_DEVICE_FlAG): # joint point + interactive = paddle.fluid.layers.elementwise_add(bottom_a, + bottom_b) + return interactive + + def top_layer(self, interactive, label_input): + with paddle.fluid.device_guard(self.PART_B_DEVICE_FlAG): + y = self._mlp_layers_top[0](interactive) + y_top = self._mlp_layers_top[1](y) + predict_2d = paddle.nn.functional.softmax(y_top) + auc, batch_auc, [ + self.batch_stat_pos, self.batch_stat_neg, self.stat_pos, + self.stat_neg + ] = paddle.static.auc(input=predict_2d, + label=label_input, + num_thresholds=2**12, + slide_steps=20) + + cost = paddle.nn.functional.cross_entropy( + input=y_top, label=label_input) + avg_cost = paddle.mean(x=cost) + + return auc, avg_cost + + def forward(self, sparse_inputs, dense_inputs, label_input): + bottom_a = self.bottom_a_layer(sparse_inputs) + + bottom_b = self.bottom_b_layer(dense_inputs) + + interactive = self.interactive_layer(bottom_a, bottom_b) + + auc, avg_cost = self.top_layer(interactive, label_input) + + return auc, avg_cost + + class StaticModel(): def __init__(self, config): self.cost = None @@ -147,13 +294,9 @@ class StaticModel(): sparse_number, self.fc_sizes, sync_mode=self.sync_mode) - raw_predict_2d = dnn_model.forward(self.sparse_inputs, self.dense_input) - predict_2d = paddle.nn.functional.softmax(raw_predict_2d) - self.predict = predict_2d - auc, batch_auc, [ self.batch_stat_pos, self.batch_stat_neg, self.stat_pos, self.stat_neg @@ -173,3 +316,22 @@ class StaticModel(): fetch_dict = {'cost': avg_cost, 'auc': auc} return fetch_dict + + def fl_net(self, input, is_infer=False): + self.label_input = input[0] + self.sparse_inputs = input[1:self.sparse_inputs_slots] + self.dense_input = input[-1] + self.sparse_number = self.sparse_inputs_slots - 1 + + fl_dnn_model = FlDNNLayer( + self.sparse_feature_number, + self.sparse_feature_dim, + self.dense_input_dim, + self.sparse_number, + sync_mode=self.sync_mode) + + auc, avg_cost = fl_dnn_model.forward(self.sparse_inputs, + self.dense_input, self.label_input) + fetch_dict = {'cost': avg_cost, 'auc': auc} + self._cost = avg_cost + return fetch_dict -- GitLab