未验证 提交 d999049f 编写于 作者: Z ziyoujiyi 提交者: GitHub

add federated learning parameter server(fl-ps) mode (#42682)

* back fl

* delete ssl cert

* .

* make warning

* .

* unittest paral degree

* solve unittest

* heter & multi cloud commm ready

* .

* .

* fl-ps v1.0

* .

* support N + N mode

* .

* .

* .

* .

* delete print

* .

* .

* .

* .
上级 2810dfea
......@@ -255,6 +255,7 @@ option(WITH_POCKETFFT "Compile with pocketfft support" ON)
option(WITH_RECORD_BUILDTIME "Compile PaddlePaddle with record all targets build time" OFF)
option(WITH_CUSTOM_DEVICE "Compile with custom device support" OFF)
option(WITH_ARM_BRPC "Supprot Brpc in Arm" OFF)
option(WITH_FLPS "FL PS mode" OFF)
if(WITH_RECORD_BUILDTIME)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}")
......
......@@ -78,6 +78,10 @@ if(WITH_ARM_BRPC)
add_definitions(-DPADDLE_WITH_ARM_BRPC)
endif()
if(WITH_FLPS)
add_definitions(-DPADDLE_WITH_FLPS)
endif()
if(WITH_GLOO)
add_definitions(-DPADDLE_WITH_GLOO)
endif()
......
文件模式从 100644 更改为 100755
......@@ -139,8 +139,9 @@ void HeterClient::SendAndRecvAsync(
message_name, send_var_name_val, recv_var_name_val, *p_ctx, p_scope,
&request, &request_io_buffer);
int micro_id = GetMicroId(ctx, p_scope);
int micro_id = GetMicroId(ctx, p_scope); // global
auto minibatch_id = micro_id / 10;
VLOG(4) << "micro_id: " << micro_id;
// select channel according to micro id
if (mode == "forward") {
int num = minibatch_id % xpu_channels_.size();
......
......@@ -155,13 +155,13 @@ class HeterClient {
// HeterClient singleton
static std::shared_ptr<HeterClient> GetInstance(
const std::vector<std::string>& endpoint,
const std::vector<std::string>& previous_endpoint,
const std::vector<std::string>& endpoints,
const std::vector<std::string>& previous_endpoints,
const int& trainer_id) {
if (NULL == s_instance_) {
s_instance_.reset(new HeterClient());
s_instance_->SetXpuList(endpoint);
s_instance_->SetPreviousXpuList(previous_endpoint);
s_instance_->SetXpuList(endpoints);
s_instance_->SetPreviousXpuList(previous_endpoints);
s_instance_->SetTrainerID(trainer_id);
s_instance_->CreateClient2XpuConnection();
}
......
......@@ -94,7 +94,6 @@ void HeterServer::StartHeterInterService(bool neeed_encrypt) {
VLOG(4) << "switch inter server server start success! listen on "
<< endpoint_inter_;
}
{
std::lock_guard<std::mutex> lock(this->mutex_ready_);
stoped_ = false;
......@@ -115,9 +114,6 @@ void HeterServer::SetFanin(const int& fan_in) { service_.SetFanin(fan_in); }
void HeterServer::WaitServerReady() {
std::unique_lock<std::mutex> lock(this->mutex_ready_);
condition_ready_.wait(lock, [=] { return this->ready_ == 1; });
while (!this->ready_) {
sleep(1);
}
}
int SendAndRecvVariableHandler::SaveInSwitchWithShard(
......
......@@ -90,8 +90,10 @@ class ServiceHandlerBase {
using SharedMiniScope =
std::shared_ptr<std::unordered_map<int, ::paddle::framework::Scope*>>;
using SharedMicroScope = std::shared_ptr<std::unordered_map<
int, std::shared_ptr<std::vector<::paddle::framework::Scope*>>>>;
using SharedTaskQueue = std::shared_ptr<
std::unordered_map<int, std::shared_ptr<::paddle::framework::BlockingQueue<
std::pair<std::string, int>>>>>;
......@@ -226,6 +228,7 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
auto* tensor = var->GetMutable<framework::LoDTensor>();
auto data = reinterpret_cast<const float*>(tensor->data());
auto micro_id = static_cast<int>(data[0]);
VLOG(4) << "micro_id in heter server: " << micro_id;
int minibatch_index = micro_id / 10;
int microbatch_index = micro_id % 10;
......@@ -261,6 +264,9 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
distributed::DeserializeFromMultiVarMsgAndIOBuf(
*request, &request_io_buffer, *dev_ctx_, micro_scope);
// blocking queue handles multi thread
VLOG(4) << "Handle in HeterServer: " << message_name << ", "
<< microbatch_index;
VLOG(4) << "task_queue_ size: " << task_queue_->size();
(*task_queue_)[minibatch_index]->Push(
std::make_pair(message_name, microbatch_index));
......@@ -274,6 +280,7 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
distributed::SerializeToMultiVarMsgAndIOBuf(
message_name, response_var_names, empty_var_names, *dev_ctx_,
&local_scope, response, &response_io_buffer);
VLOG(4) << "Handle over";
return 0;
}
......@@ -612,12 +619,10 @@ class HeterServer {
// HeterWrapper singleton
static std::shared_ptr<HeterServer> GetInstance() {
if (s_instance_ == nullptr) {
std::unique_lock<std::mutex> lock(mtx_);
if (NULL == s_instance_) {
if (s_instance_ == nullptr) {
s_instance_.reset(new HeterServer());
}
}
return s_instance_;
}
......
......@@ -220,6 +220,7 @@ bool DataFeed::PickOneFile(std::string* filename) {
file_idx_, platform::errors::PreconditionNotMet(
"You should call SetFileListIndex before PickOneFile"));
std::unique_lock<std::mutex> lock(*mutex_for_pick_file_);
VLOG(4) << "filelist_ size: " << filelist_.size();
if (*file_idx_ == filelist_.size()) {
VLOG(3) << "DataFeed::PickOneFile no more file to pick";
return false;
......@@ -284,6 +285,7 @@ void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
template <typename T>
bool PrivateQueueDataFeed<T>::Start() {
VLOG(4) << "entering PrivateQueueDataFeed<T>::Start()";
CheckSetFileList();
read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this);
read_thread_.detach();
......@@ -295,6 +297,7 @@ bool PrivateQueueDataFeed<T>::Start() {
template <typename T>
void PrivateQueueDataFeed<T>::ReadThread() {
#ifdef _LINUX
VLOG(4) << "entering PrivateQueueDataFeed<T>::ReadThread()";
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
......@@ -356,6 +359,7 @@ InMemoryDataFeed<T>::InMemoryDataFeed() {
template <typename T>
bool InMemoryDataFeed<T>::Start() {
#ifdef _LINUX
VLOG(4) << "entering InMemoryDataFeed<T>::Start()";
this->CheckSetFileList();
if (output_channel_->Size() == 0 && input_channel_->Size() != 0) {
std::vector<T> data;
......@@ -664,6 +668,7 @@ void MultiSlotDataFeed::Init(
void MultiSlotDataFeed::ReadThread() {
#ifdef _LINUX
VLOG(4) << "entering MultiSlotDataFeed::ReadThread()";
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
......@@ -831,7 +836,6 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
} else {
int use_slots_num = use_slots_.size();
instance->resize(use_slots_num);
const char* str = reader.get();
std::string line = std::string(str);
......@@ -971,10 +975,13 @@ void MultiSlotDataFeed::PutToFeedVec(
if (feed_vec_[i] == nullptr) {
continue;
}
VLOG(4) << "MultiSlotDataFeed::PutToFeedVec i: " << i;
const auto& type = ins_vec[i].GetType();
const auto& offset = ins_vec[i].GetOffset();
int total_instance = static_cast<int>(offset.back());
VLOG(4) << "total_instance: " << total_instance;
// platform::CPUPlace()
VLOG(4) << "this->place_: " << this->place_;
if (type[0] == 'f') { // float
const auto& feasign = ins_vec[i].GetFloatData();
float* tensor_ptr =
......@@ -2573,6 +2580,7 @@ void SlotRecordInMemoryDataFeed::ExpandSlotRecord(SlotRecord* rec) {
}
bool SlotRecordInMemoryDataFeed::Start() {
VLOG(4) << "entering SlotRecordInMemoryDataFeed::Start";
#ifdef _LINUX
this->CheckSetFileList();
if (input_channel_->Size() != 0) {
......
......@@ -315,6 +315,7 @@ message DistributedStrategy {
optional bool adam_d2sum = 36 [ default = false ];
optional bool auto_search = 37 [ default = false ];
optional bool heter_ccl_mode = 38 [ default = false ];
optional bool is_fl_ps_mode = 39 [ default = false ];
optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
......
......@@ -32,7 +32,9 @@ using TaskQueue =
std::pair<std::string, int>>>>;
void HeterPipelineTrainer::ResetDataset(Dataset* dataset) {
#ifndef PADDLE_WITH_FLPS
if (pipeline_stage_ == 0) {
#endif
SetDataset(dataset);
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
......@@ -51,40 +53,39 @@ void HeterPipelineTrainer::ResetDataset(Dataset* dataset) {
this_worker->SetDataFeed(readers[cnt]);
this_worker->SetReaderPlace(place_);
}
#ifndef PADDLE_WITH_FLPS
}
#endif
}
void HeterPipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
trainer_desc_ = trainer_desc;
thread_num_ = trainer_desc.thread_num();
ParseDumpConfig(trainer_desc);
SetDebug(trainer_desc.debug());
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
VLOG(3) << "readers num: " << readers.size();
// change thread num to readers num
thread_num_ = readers.size();
VLOG(3) << "worker thread num: " << thread_num_;
VLOG(3) << "worker(readers) thread num: " << thread_num_;
const auto& heter_section_params = trainer_desc.heter_section_param();
num_pipeline_stages_ = heter_section_params.num_pipeline_stages();
pipeline_stage_ = heter_section_params.pipeline_stage();
num_microbatches_ = heter_section_params.num_microbatches();
VLOG(3) << "Number of microbatches per minibatch: " << num_microbatches_;
trainer_desc_ = trainer_desc;
trainer_id_ = trainer_desc.trainer_id();
for (int i = 0; i < num_pipeline_stages_; ++i) {
auto trainer_num = trainer_desc.trainers(i);
trainers_.push_back(trainer_num);
}
int cpu_trainer_num = trainers_[0];
// int cur_stage_trainer_num = trainers_[pipeline_stage_];
// int global_thread_num = cpu_trainer_num * thread_num_;
// int previous_trainers = 0;
// for (int i = 0; i < pipeline_stage_; i++) previous_trainers +=
// trainers_[i];
// int stage_trainer_id =
// trainer_id_ - previous_trainers; // trainer id in current stage
VLOG(4) << "trainer_id_: " << trainer_id_;
VLOG(4) << "cpu_trainer_num: " << cpu_trainer_num
<< " xpu_trainer_num: " << trainers_[1];
#ifdef PADDLE_WITH_FLPS
thread_num_ = 1;
#endif
if (pipeline_stage_ == 0) { // for cpu trainer
int cnt = -1;
int real_thread_id = trainer_id_;
......@@ -103,25 +104,33 @@ void HeterPipelineTrainer::Initialize(const TrainerDesc& trainer_desc,
this_worker->InitRandomDumpConfig(trainer_desc);
this_worker->SetDeviceIndex(real_thread_id);
real_thread_id += cpu_trainer_num;
// if (pipeline_stage_ == 0) {
this_worker->SetDataFeed(readers[cnt]);
//}
this_worker->SetMicrobatchNum(num_microbatches_);
this_worker->SetPipelineStageNum(num_pipeline_stages_);
this_worker->SetPipelineStage(pipeline_stage_);
}
} else { // for heter_trainer
// heter trainer with thread_id == -1 is not for
// real training
} else {
// for heter_trainer
// heter trainer with thread_id == -1 is not for real training, just for run
// listen op
workers_[-1] = DeviceWorkerFactory::CreateDeviceWorker(
trainer_desc.device_worker_name());
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::HeterSectionWorker>(
workers_[-1]);
#ifdef PADDLE_WITH_FLPS
this_worker->SetDebug(debug_);
this_worker->SetNeedDumpField(need_dump_field_);
this_worker->SetNeedDumpParam(need_dump_param_);
this_worker->SetDumpFieldVector(dump_fields_);
this_worker->SetDumpParamVector(dump_param_);
this_worker->InitRandomDumpConfig(trainer_desc);
this_worker->SetDataFeed(readers[0]);
#endif
this_worker->SetDeviceIndex(-1);
this_worker->SetMicrobatchNum(num_microbatches_);
this_worker->SetPipelineStageNum(num_pipeline_stages_);
this_worker->SetPipelineStage(pipeline_stage_);
this_worker->SetDeviceIndex(-1);
}
}
......@@ -159,14 +168,19 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program,
for (auto& worker_pair : workers_) {
auto worker_index = worker_pair.first;
auto device_worker = worker_pair.second;
VLOG(0) << "workers index in InitTrainerEnv: " << worker_index;
auto this_worker =
std::dynamic_pointer_cast<paddle::framework::HeterSectionWorker>(
device_worker);
this_worker->SetPlace(place);
this_worker->Initialize(trainer_desc_);
#ifdef PADDLE_WITH_FLPS
this_worker->SetReaderPlace(place);
#else
if (pipeline_stage_ == 0) {
this_worker->SetReaderPlace(place);
}
#endif
this_worker->SetRootScope(root_scope_);
// generate mini_batch scope for every worker
auto* minibatch_scope = &root_scope_->NewScope();
......@@ -175,6 +189,7 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program,
// after set micro num & mini batch scope
this_worker->CreateMicrobatchScopes();
(*micro_scopes_)[worker_index] = this_worker->GetMicrobatchScopes();
VLOG(4) << "worker_index: " << worker_index;
(*task_queue_)[worker_index] = this_worker->GetThreadQueue();
}
}
......@@ -182,6 +197,7 @@ void HeterPipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program,
void HeterPipelineTrainer::Run() {
VLOG(3) << "Going to run HeterPipelineTrainer::Run()";
if (listen_ptr_ == nullptr) {
VLOG(3) << "listen_ptr_ is null";
for (auto& worker_pair : workers_) {
auto& device_worker = worker_pair.second;
auto worker_0 =
......@@ -196,10 +212,14 @@ void HeterPipelineTrainer::Run() {
heter_server->WaitServerReady();
heter_server->SetMiniBatchScopes(mini_scopes_);
heter_server->SetMicroBatchScopes(micro_scopes_);
VLOG(4) << "heter_server SetTaskQueue";
heter_server->SetTaskQueue(task_queue_);
// main training logic
VLOG(3) << "pipeline_stage_ is " << pipeline_stage_;
if (pipeline_stage_ == 0) { // for cpu trainer
for (auto& worker_pair : workers_) {
VLOG(4) << "cpu worker index : " << worker_pair.first;
auto device_worker = worker_pair.second;
if (!debug_) {
threads_.push_back(
......@@ -212,6 +232,7 @@ void HeterPipelineTrainer::Run() {
} else { // for heter worker
// start thread_worker with thread_id = -1
for (auto& worker_pair : workers_) {
VLOG(4) << "xpu worker index : " << worker_pair.first;
auto device_worker = worker_pair.second;
if (!debug_) {
threads_.push_back(
......@@ -252,6 +273,10 @@ void HeterPipelineTrainer::Run() {
this_worker->SetPipelineStageNum(num_pipeline_stages_);
this_worker->SetPipelineStage(pipeline_stage_);
this_worker->SetPlace(place_);
#ifdef PADDLE_WITH_FLPS
this_worker->SetDataFeed(workers_[-1]->device_reader_);
this_worker->SetReaderPlace(place_);
#endif
this_worker->Initialize(trainer_desc_);
this_worker->SetRootScope(root_scope_);
......@@ -308,5 +333,5 @@ Scope* HeterPipelineTrainer::GetWorkerScope(int thread_id) {
}
} // end namespace framework
} // end namespace paddle
} // namespace paddle
#endif
......@@ -65,6 +65,52 @@ class TrainerDesc;
uint64_t HeterSectionWorker::batch_id_(0);
#ifdef PADDLE_WITH_FLPS
void HeterSectionWorker::Initialize(const TrainerDesc& desc) {
trainer_desc_ = desc;
fetch_config_ = desc.fetch_config();
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
program_.reset(new ProgramDesc(
desc.heter_section_param().section_config().program_desc()));
thread_queue_.reset(
new ::paddle::framework::BlockingQueue<std::pair<std::string, int>>());
VLOG(4) << "addr of thread_queue_ is: " << thread_queue_.get();
bool is_first_stage = (pipeline_stage_ == 0);
bool is_last_stage = (pipeline_stage_ + 1 == num_pipeline_stages_);
if (is_first_stage) {
VLOG(0) << "entering first stage";
for (auto& op_desc : program_->Block(0).AllOps()) {
forward_ops_.push_back(std::move(OpRegistry::CreateOp(*op_desc)));
}
for (auto& op_desc : program_->Block(1).AllOps()) {
auto op = std::move(OpRegistry::CreateOp(*op_desc));
auto op_type = op->Type();
if (listen_op_ == nullptr && op_type == "heter_listen_and_serv") {
listen_op_ = std::move(op);
} else {
backward_ops_.push_back(std::move(op));
}
}
} else if (is_last_stage) {
VLOG(0) << "HeterSectionWorker::Initialize for the last stage";
for (auto& op_desc : program_->Block(0).AllOps()) {
auto op = std::move(OpRegistry::CreateOp(*op_desc));
auto op_type = op->Type();
if (listen_op_ == nullptr && op_type == "heter_listen_and_serv") {
listen_op_ = std::move(op);
} else {
forward_ops_.push_back(std::move(op));
}
}
VLOG(0) << "test111";
for (auto& op_desc : program_->Block(1).AllOps()) {
auto op = std::move(OpRegistry::CreateOp(*op_desc));
backward_ops_.push_back(std::move(op));
}
}
}
#else
void HeterSectionWorker::Initialize(const TrainerDesc& desc) {
trainer_desc_ = desc;
fetch_config_ = desc.fetch_config();
......@@ -122,6 +168,7 @@ void HeterSectionWorker::Initialize(const TrainerDesc& desc) {
}
}
}
#endif
void HeterSectionWorker::RunBackward(int micro_id) {
for (size_t i = 0; i < backward_ops_.size(); i++) {
......@@ -147,8 +194,11 @@ void HeterSectionWorker::RunBackward(int micro_id) {
void HeterSectionWorker::MiniBatchBarrier() {
// get micro id & deserialize data
std::set<int> micro_ids;
VLOG(4) << "entering MiniBatchBarrier";
VLOG(4) << "micro_ids_.size(): " << micro_ids_.size();
while (micro_ids.size() < micro_ids_.size()) {
auto task = (*thread_queue_).Pop();
VLOG(4) << "got one task from task que in cpu worker";
auto message_name = task.first;
auto micro_id = task.second;
PADDLE_ENFORCE_EQ(message_name.find("backward") != std::string::npos, true,
......@@ -164,19 +214,44 @@ void HeterSectionWorker::MiniBatchBarrier() {
RunBackward(micro_id);
batch_num_++;
BatchPostProcess();
VLOG(0) << "one task in cpu worker overed!";
}
micro_ids_.clear();
}
void HeterSectionWorker::RunListen() { listen_op_->Run(*root_scope_, place_); }
void HeterSectionWorker::RunListen() {
VLOG(4) << ">>> run listen_op";
listen_op_->Run(*root_scope_, place_);
VLOG(4) << "<<< run listen_op over";
}
void HeterSectionWorker::RunForward(int micro_id) {
if (pipeline_stage_ == 0) {
#ifdef PADDLE_WITH_FLPS
BindingDataFeedMemory(micro_id);
if (debug_) {
timeline_.Start();
}
int cur_micro_batch = device_reader_->Next();
if (cur_micro_batch <= 0) {
VLOG(0) << "no more data in device_reader_";
epoch_finish_ = true;
return;
}
if (debug_) {
timeline_.Pause();
read_time_ += timeline_.ElapsedSec();
total_time_ += timeline_.ElapsedSec();
total_ins_num_ += cur_micro_batch;
}
VLOG(3) << "read a batch in thread " << thread_id_ << " micro " << micro_id;
#else
if (pipeline_stage_ == 0) {
BindingDataFeedMemory(micro_id);
if (debug_) {
timeline_.Start();
}
int cur_micro_batch =
device_reader_->Next(); // batch_size is just micro_batch_size
if (cur_micro_batch <= 0) {
epoch_finish_ = true;
return;
......@@ -189,6 +264,7 @@ void HeterSectionWorker::RunForward(int micro_id) {
}
VLOG(3) << "read a batch in thread " << thread_id_ << " micro " << micro_id;
}
#endif
for (size_t i = 0; i < forward_ops_.size(); i++) {
auto& op = forward_ops_[i];
VLOG(3) << "Forward: start to run op " << op->Type() << " for micro-batch "
......@@ -301,7 +377,7 @@ void HeterSectionWorker::Run() {
while (!epoch_finish_) {
// forward
for (int i = 0; i < num_microbatches_; i++) {
VLOG(5) << "Run " << i << " microbatch";
VLOG(4) << "Run " << i << " microbatch";
RunForward(i);
if (epoch_finish_ == true) {
break;
......@@ -312,15 +388,19 @@ void HeterSectionWorker::Run() {
if (micro_ids_.size() > 0) {
MiniBatchBarrier();
}
VLOG(0) << "one batch run over! micro_ids_size: " << micro_ids_.size();
}
} else { // for heter worker
VLOG(4) << "entering heter Run...";
auto heter_server = paddle::distributed::HeterServer::GetInstance();
while (true) {
if (heter_server->IsStop()) {
VLOG(0) << "heter_server is stopped!!";
epoch_finish_ = true;
break;
}
auto task = (*thread_queue_).Pop();
VLOG(4) << "got one task from task que in heter worker";
auto message_name = task.first;
auto micro_id = task.second;
if (is_last_stage) {
......@@ -331,6 +411,8 @@ void HeterSectionWorker::Run() {
RunBackward(micro_id);
batch_num_++;
BatchPostProcess();
VLOG(0) << "one batch run over! micro_id: " << micro_id
<< " batch_num: " << batch_num_;
} else {
if (message_name.find("forward") != std::string::npos) {
RunForward(micro_id);
......@@ -371,6 +453,7 @@ void HeterSectionWorker::BatchPostProcess() {
}
void HeterSectionWorker::TrainFiles() {
VLOG(4) << "entering HeterSectionWorker::TrainFiles";
if (thread_id_ >= 0) {
total_ins_num_ = 0;
batch_num_ = 0;
......@@ -378,9 +461,17 @@ void HeterSectionWorker::TrainFiles() {
timeline_.Start();
VLOG(3) << "begin section_worker TrainFiles";
epoch_finish_ = false;
#ifdef PADDLE_WITH_FLPS
if (device_reader_ == nullptr) {
VLOG(4) << "device_reader_ is null!!";
}
device_reader_->Start();
#else
if (pipeline_stage_ == 0) {
device_reader_->Start();
}
#endif
VLOG(4) << "Run in TrainFiles:";
while (!epoch_finish_) {
Run();
dev_ctx_->Wait();
......@@ -428,9 +519,13 @@ void HeterSectionWorker::TrainFilesWithProfiler() {
total_ins_num_ = 0;
op_name_.clear();
op_total_time_.clear();
#ifdef PADDLE_WITH_FLPS
device_reader_->Start();
#else
if (pipeline_stage_ == 0) {
device_reader_->Start();
}
#endif
while (!epoch_finish_) {
Run();
dev_ctx_->Wait();
......
......@@ -1318,6 +1318,18 @@ class DistributedStrategy(object):
"""
return self.strategy.pipeline
@property
def is_fl_ps_mode(self):
return self.strategy.is_fl_ps_mode
@is_fl_ps_mode.setter
@is_strict_auto
def is_fl_ps_mode(self, flag):
if isinstance(flag, bool):
self.strategy.is_fl_ps_mode = flag
else:
print("WARNING: is_fl_ps_mode should have value of bool type")
@pipeline.setter
@is_strict_auto
def pipeline(self, flag):
......
......@@ -204,6 +204,26 @@ class UtilBase(object):
def _scatter(self):
pass
def get_heter_file_shard(self, files):
if not isinstance(files, list):
raise TypeError("files should be a list of file need to be read.")
trainers = self.role_maker._worker_num()
trainer_id = self.role_maker._worker_index() - trainers
remainder = len(files) % trainers
blocksize = int(len(files) / trainers)
blocks = [blocksize] * trainers
for i in range(remainder):
blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
def get_file_shard(self, files):
"""
Split files before distributed training, and return filelist assigned to the current trainer.
......
......@@ -75,6 +75,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
"use_ps_gpu"]
attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[
"lr_decay_steps"]
attrs['is_fl_ps_mode'] = self.user_defined_strategy.is_fl_ps_mode
attrs['k_steps'] = self.user_defined_strategy.a_sync_configs["k_steps"]
attrs['launch_barrier'] = self.user_defined_strategy.a_sync_configs[
"launch_barrier"]
......
......@@ -17,9 +17,11 @@ import paddle
import paddle.compat as cpt
from ..ps.utils.public import *
from paddle.framework import core
from .pass_base import PassBase, register_pass
from paddle.distributed.passes.pass_base import PassBase, register_pass
from paddle.fluid.transpiler.details.program_utils import delete_ops
from paddle.fluid.transpiler.collective import SingleProcessMultiThread
from _collections import deque, defaultdict
from paddle.fluid.framework import Program, Parameter
@register_pass("append_send_ops_pass")
......@@ -47,7 +49,6 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用
if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]:
dummy_output = program.global_block().create_var(
name=framework.generate_control_dev_var_name())
logger.info("dummy_output: {}".format(dummy_output))
program.global_block().append_op(
type="send",
inputs={"X": send_input_vars},
......@@ -74,31 +75,27 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
print("pass loss program id:", id(attrs['loss'].block.program))
print("pass main program id:", id(main_program))
ps_mode = attrs['ps_mode']
if ps_mode == DistributedMode.GEO:
send_ctx = get_geo_trainer_send_context(attrs) # geo 模式
elif attrs['is_heter_ps_mode'] == True:
print("is_heter_ps_mode in append_send_ops_pass!!")
send_ctx = get_the_one_send_context(attrs, split_dense_table=True)
else:
send_ctx = get_the_one_send_context(attrs) # async、sync 等各种模式
logger.info("send_ctx: {}".format(send_ctx))
dummys = []
for merged_name, send in send_ctx.items():
if send.is_sparse() and ps_mode != DistributedMode.GEO:
continue
if send.program_id() != id(attrs['loss'].block.program):
continue
logger.info('merged_name, send: {}, {}'.format(merged_name, send))
is_sparse = 1 if send.is_sparse() else 0
is_sparse = 2 if send.is_distributed() else is_sparse
dummys.append(
self._append_send_op(main_program,
send.origin_varnames(), merged_name,
is_sparse, send.table_id(), ps_mode))
logger.info('ps trainer pass - ps mode: {}'.format(ps_mode))
logger.info('dummys: {}'.format(dummys))
if ps_mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]:
logger.info('insert send_barrier_op')
trainer_id = get_role_id(attrs['role_maker'])
self._append_barrier_op(main_program, dummys, trainer_id)
......@@ -453,6 +450,8 @@ class DistributedOpsPass(PassBase):
attrs = pass_ctx._attrs
pull_sparse_ops, push_sparse_ops, use_cvm_op = self._get_pull_sparse_ops(
main_program, attrs)
print("is_heter_ps_mode in distributed_ops_pass {}?".format(attrs[
'is_heter_ps_mode']))
send_ctx = get_the_one_send_context(
attrs, split_dense_table=attrs['is_heter_ps_mode'])
self._pull_sparse_fuse(main_program, pull_sparse_ops, attrs, send_ctx)
......@@ -505,7 +504,6 @@ class DeleteOptimizesPass(PassBase):
persistable=True)
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
print("delete_optimizer_pass")
attrs = pass_ctx._attrs
optimizer_ops = get_optimize_ops(main_program)
lr_ops = get_lr_ops(main_program)
......@@ -833,9 +831,9 @@ class SplitHeterWorkerOpsPass(PassBase):
block_var_detail, current_device, False)
# add send op
send_grad_var_list = add_heter_send_op(program, heter_program,
heter_block_bp,
block_var_detail[stage_id - 1])
send_grad_var_list = add_send_op(
program, heter_block_bp,
block_var_detail[stage_id - 1]["backward"]["persistables"])
# add step conter
send_input_vars = []
......@@ -909,7 +907,7 @@ class SplitTrainerOpsPass(PassBase):
first_op_idx = all_op.index(op)
break
assert first_op_idx != -1
self._delete_same_ops(program.global_block(), ops_list)
delete_same_ops(program.global_block(), ops_list)
entrance_var = []
role_maker = attrs['role_maker']
......@@ -939,17 +937,6 @@ class SplitTrainerOpsPass(PassBase):
return entrance_var
def _delete_same_ops(self, block, ops):
for op in ops:
try:
for origin_op in block.ops:
if str(origin_op) == str(op):
idx = list(block.ops).index(origin_op)
block._remove_op(idx)
break
except Exception as e:
print(e)
def _remove_var_pair_by_grad(self, var_name, attrs):
for index, pair in enumerate(attrs['merged_variables_pairs']):
var = pair[0]
......@@ -1019,7 +1006,7 @@ class SplitTrainerOpsPass(PassBase):
grad_to_block_id = []
bp_ops_list = program_block_ops_list[0]["backward"]
self._delete_same_ops(program.global_block(), bp_ops_list)
delete_same_ops(program.global_block(), bp_ops_list)
delete_trainer_useless_var(program, static_var)
backward_block = create_backward_block(program, origin_program,
bp_ops_list, block_var_detail)
......@@ -1093,12 +1080,13 @@ class SetHeterPipelineOptPass(PassBase):
num_microbatches = attrs['user_defined_strategy'].pipeline_configs[
'accumulate_steps']
attrs['origin_startup_program']._heter_pipeline_opt = {
startup_program._heter_pipeline_opt = {
"startup_program": startup_program,
"pipeline_stage": int(role_maker._get_stage_id()) - 1,
"heter_place": role_maker._heter_device(),
"is_fl_mode": 1
}
attrs['origin_main_program']._heter_pipeline_opt = {
main_program._heter_pipeline_opt = {
"trainer": "HeterPipelineTrainer",
"device_worker": "HeterSection",
"trainers":
......@@ -1109,4 +1097,313 @@ class SetHeterPipelineOptPass(PassBase):
"section_program": main_program,
"num_microbatches": num_microbatches,
"heter_place": role_maker._heter_device(),
"is_fl_mode": 1
}
@register_pass("split_fl_ops_pass")
class SplitFlOpsPass(PassBase):
def __init__(self):
super(SplitFlOpsPass, self).__init__()
self.PART_A_DEVICE_FlAG = 'gpu:0'
self.PART_A_JOINT_OP_DEVICE_FlAG = 'gpu:2'
self.PART_B_DEVICE_FlAG = 'gpu:1'
self.PART_B_JOINT_OP_DEVICE_FlAG = 'gpu:3'
def _check_self(self):
return True
def _check_conflict(self, other_pass):
return True
def _insert_encrypt_op(self):
pass
def _insert_decrypt_op(self):
pass
def _clear_op_device_flag(self, program):
for block in program.blocks:
for op in block.ops:
device = op.attr(OP_DEVICE_KEY)
op._set_attr(OP_DEVICE_KEY, '') if device != '' else None
def _split_fl_program(self):
self.partA_ops = []
self.partB_ops = []
party_program_map = defaultdict(Program)
block = self.ori_main_program.block(0)
for op in block.ops:
device = op.attr(OP_DEVICE_KEY)
if device == self.PART_A_DEVICE_FlAG or device == '' or device == self.PART_A_JOINT_OP_DEVICE_FlAG:
program = party_program_map['a']
self.partA_ops.append(op)
elif device == self.PART_B_DEVICE_FlAG or device == self.PART_B_JOINT_OP_DEVICE_FlAG:
program = party_program_map['b']
self.partB_ops.append(op)
op_desc = op.desc
ap_op = program.global_block().desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(OP_DEVICE_KEY, device)
for key in ['a', 'b']:
program = party_program_map[key]
program._sync_with_cpp()
return party_program_map
def _insert_partA_communicate_op(self, block, idx):
comm_info = "forward_joint_{}_{}@fl_ps".format(1, 2)
block._insert_op(
idx,
type='send_and_recv',
inputs={'X': self.partA_to_partB_tensor},
outputs={'Out': []},
attrs={
'mode': 'forward', # mode 直接关联前向和反向 channel 选择
'send_var_name':
self.partA_to_partB_tensor_name + ["microbatch_id"],
'recv_var_name': [],
'message_name': comm_info,
'next_endpoints':
get_next_stage_trainers(self.role_maker), # partB_endpoints
'previous_endpoints':
get_previous_stage_trainers(self.role_maker),
'trainer_id': get_role_id(self.role_maker), # global id
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
return
def _insert_partB_communicate_op(self, block, idx):
comm_info = ("backward_joint_{}_{}@fl_ps".format(2, 1))
block._insert_op(
idx,
type='send_and_recv',
inputs={'X': self.partB_to_partA_grad},
outputs={'Out': []},
attrs={
'mode': 'backward',
'send_var_name':
self.partB_to_partA_grad_name + ["microbatch_id"],
'recv_var_name': [],
'message_name': comm_info,
'next_endpoints':
get_next_stage_trainers(self.role_maker), # partA_endpoints
'previous_endpoints':
get_previous_stage_trainers(self.role_maker),
'trainer_id': get_role_id(self.role_maker), # global id
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
return
def _create_var_for_block(self, vars, block):
for var in vars:
if block._find_var_recursive(str(var)):
continue
source_var = self.ori_main_block._var_recursive(str(var))
if isinstance(var, Parameter):
dest_var = block.create_parameter(
name=source_var.name,
shape=source_var.shape,
dtype=source_var.dtype,
type=source_var.type,
lod_level=source_var.lod_level,
stop_gradient=source_var.stop_gradient,
trainable=source_var.trainable,
optimize_attr=source_var.optimize_attr,
regularizer=source_var.regularizer,
error_clip=source_var.error_clip)
else:
dest_var = block._clone_variable(source_var, False)
dest_var.stop_gradient = source_var.stop_gradient
if hasattr(source_var, 'is_distributed'):
dest_var.is_distributed = source_var.is_distributed
def _get_block_by_idx(self, op_list, program, block_idx):
if block_idx < len(program.blocks):
new_block = program.block(block_idx)
else:
new_block = program._create_block()
for _, op in enumerate(op_list):
ap_op = new_block.desc.append_op()
ap_op.copy_from(op.desc)
ap_op._set_attr(OP_DEVICE_KEY, op.attr(OP_DEVICE_KEY))
vars = op.desc.input_arg_names() + op.desc.output_arg_names()
self._create_var_for_block(vars, new_block)
new_block._sync_with_cpp()
return new_block
def _find_joint_forward_op(self, block, flag):
op_idx = 0
for op in block.ops:
if is_forward_op(op) and op.attr(OP_DEVICE_KEY) == flag:
return op_idx
else:
op_idx += 1
return op_idx
def _find_joint_backward_op(self, block, flag):
op_idx = 0
for op in block.ops:
if is_backward_op(op) and op.attr(OP_DEVICE_KEY) == flag:
return op_idx
else:
op_idx += 1
return op_idx
def _get_partB_to_partA_grad(self, block, flag):
op_idx = self._find_joint_backward_op(block, flag)
op = block.ops[op_idx]
vars1 = op.desc.input_arg_names()
op_idx = self._find_joint_forward_op(block, flag)
op = block.ops[op_idx]
vars2 = op.desc.output_arg_names()
self.partB_to_partA_grad_name = list(set(vars1) - set(vars2))
self.partB_to_partA_grad = []
for var_name in self.partB_to_partA_grad_name:
self.partB_to_partA_grad.append(self.ori_main_block.var(var_name))
def _find_dense_grad_vars(self, bp_op_list):
program = self.ori_main_program
bp_op_input, bp_op_output = find_ops_list_input_output(program,
bp_op_list)
return (screen_persistables(program, bp_op_input) + screen_persistables(
program, bp_op_output))
def _get_partA_program(self, block):
# 1. create block 0
# 1.1 insert send op
op_idx = self._find_joint_forward_op(block,
self.PART_A_JOINT_OP_DEVICE_FlAG)
op_list = []
for i in range(len(block.ops)):
op = block.ops[i]
op_list.append(op)
if i == op_idx:
out_name = op.desc.output_arg_names()[0]
self.partA_to_partB_tensor_name = op.desc.output_arg_names()
self.partA_to_partB_tensor = self.ori_main_block.var(out_name)
break
first_block = self._get_block_by_idx(op_list, self.partA_program, 0)
self._insert_partA_communicate_op(first_block, op_idx + 1)
# logger.info('partA-first_block:{}'.format(first_block))
# 2. create block 1
bp_op_list = get_bp_op_list(block)
push_sparse_op_list = get_distributed_push_sparse_op_list(block)
# logger.info('bp_op_list: {}'.format(bp_op_list))
second_block = self._get_block_by_idx(bp_op_list + push_sparse_op_list,
self.partA_program, 1)
# 2.1. insert partA recv op
block_input_flag = "backward_joint_{}_{}@fl_ps".format(2, 1)
grad_to_block_id = block_input_flag + ":" + str(second_block.idx)
attrs = {
"message_to_block_id": [grad_to_block_id],
"optimize_blocks": [second_block],
"endpoint": get_trainer_endpoint(self.role_maker), ##
"fanin": 0,
"pserver_id": get_role_id(self.role_maker),
"distributed_mode": self.ps_mode,
"rpc_exec_thread_num": int(os.getenv("CPU_NUM", 32)),
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
}
second_block._insert_op(
index=0,
type='heter_listen_and_serv',
inputs={'X': []},
outputs={},
attrs=attrs)
# 2.2 insert push dense grad op
send_ops = find_send_op(self.ori_main_program) # push dense
delete_same_ops(block, send_ops)
dense_grad_vars = self._find_dense_grad_vars(bp_op_list)
add_send_op(self.ori_main_program, second_block, dense_grad_vars)
# logger.info('partA-second_block:{}'.format(second_block))
def _get_partB_program(self, block):
op_idx1 = self._find_joint_forward_op(
block, self.PART_B_JOINT_OP_DEVICE_FlAG) # elementwise_add op
op_idx2 = self._find_joint_backward_op(block,
self.PART_B_JOINT_OP_DEVICE_FlAG)
op_cnt = 0
op_list1 = []
op_list2 = []
op_list3 = []
for op in block.ops:
if op_cnt < op_idx1:
op_list1.append(op)
elif op_cnt <= op_idx2:
op_list2.append(op)
else:
op_list3.append(op)
op_cnt += 1
# 1. create block 0
first_block = self._get_block_by_idx(op_list1, self.partB_program, 0)
# 2. create block 1
second_block = self._get_block_by_idx(op_list2, self.partB_program, 1)
# 2.1 insert send op
self._insert_partB_communicate_op(second_block, len(op_list2))
# 2.2 insert remain ops
second_block = self._get_block_by_idx(op_list3, self.partB_program, 1)
# 2.3 insert push dense grad op
bp_op_list = get_bp_op_list(second_block)
dense_grad_vars = self._find_dense_grad_vars(bp_op_list)
add_send_op(self.ori_main_program, second_block, dense_grad_vars)
# 3. insert partB recv op
block_input_flag = "forward_joint_{}_{}@fl_ps".format(1, 2)
grad_to_block_id = block_input_flag + ":" + str(second_block.idx)
attrs = {
"message_to_block_id": [grad_to_block_id],
"optimize_blocks": [second_block], ## what to do?
"endpoint": get_heter_worker_endpoint(self.role_maker),
"fanin": len(get_previous_stage_trainers(self.role_maker)),
"pserver_id": 1, # TODO
"distributed_mode": self.ps_mode,
"rpc_exec_thread_num": int(os.getenv("CPU_NUM", 32)),
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
}
first_block._insert_op(
index=len(op_list1),
type="heter_listen_and_serv",
inputs={'X': []},
outputs={},
attrs=attrs)
#logger.info('partB-first_block:{}'.format(first_block))
#logger.info('partB-second_block:{}'.format(second_block))
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
self.role_maker = attrs['role_maker']
self.ps_mode = attrs['ps_mode']
self.is_part_b = attrs['is_heter_worker'] # TODO
self.ori_main_program = main_program
self.ori_main_block = main_program.block(0)
party_program_map = self._split_fl_program()
prog_a = party_program_map['a']
_main_file = ps_log_root_dir + '6_fl_A_main_program.prototxt'
debug_program(_main_file, prog_a)
self._get_partB_to_partA_grad(prog_a.global_block(),
self.PART_A_JOINT_OP_DEVICE_FlAG)
prog_b = party_program_map['b']
_main_file = ps_log_root_dir + '6_fl_B_main_program.prototxt'
debug_program(_main_file, prog_b)
if not self.is_part_b:
self.partA_program = framework.Program()
self._get_partA_program(prog_a.global_block())
pass_ctx._attrs['part_a_main_program'] = self.partA_program
self._clear_op_device_flag(self.partA_program)
check_program(self.partA_program)
else:
self.partB_program = framework.Program()
self._get_partB_program(prog_b.global_block())
pass_ctx._attrs['part_b_main_program'] = self.partB_program
self._clear_op_device_flag(self.partB_program)
check_program(self.partB_program)
......@@ -732,6 +732,8 @@ class PsDescBuilder(object):
self.is_heter_ps_mode = context['is_heter_ps_mode']
self.use_ps_gpu = context['use_ps_gpu']
self.barrier_table_id = None
print("is_heter_ps_mode in the_one_ps.py? {}".format(
self.is_heter_ps_mode))
self.send_ctx = get_the_one_send_context(
self.context,
use_origin_program=True,
......@@ -772,6 +774,7 @@ class PsDescBuilder(object):
self.tensor_tables = self._get_tensor_tables()
tables.extend(self.tensor_tables)
tables.append(globals()['BarrierTable'](self.context, len(tables)))
print("test_fl_ps: tables len: {}".format(len(tables)))
return tables
def _get_service(self):
......@@ -864,7 +867,7 @@ class TheOnePSRuntime(RuntimeBase):
scope = scopes[idx]
table_id = ctx.table_id()
var_names = recv_map[table_id]
# print("init params:", idx, table_id, var_names)
#print("init params:", idx, table_id, var_names)
self._worker.push_dense_params(scope, table_id, var_names)
def _pull_all_dense(self, scopes, send_ctx, recv_map):
......@@ -875,7 +878,7 @@ class TheOnePSRuntime(RuntimeBase):
scope = scopes[idx]
table_id = ctx.table_id()
var_names = recv_map[table_id]
# print("pull all dense:", idx, table_id, var_names)
#print("pull all dense:", idx, table_id, var_names)
self._worker.pull_dense_params(scope, table_id, var_names)
def _init_params(self, program, scope, send_ctx, recv_map):
......@@ -902,7 +905,8 @@ class TheOnePSRuntime(RuntimeBase):
def _init_worker(self, scopes=None):
worker_desc = self.ps_desc_builder.build_worker_desc()
#with open("test_fl_ps_worker_desc", "w") as f:
# f.write(worker_desc)
if self.context['use_ps_gpu']:
main_program = self.context['loss'].block.program
if not main_program._fleet_opt:
......@@ -955,7 +959,8 @@ class TheOnePSRuntime(RuntimeBase):
role_id = get_role_id(self.role_maker)
self._worker.init_worker(proto_txt, self.string_hosts, role_id)
if self.context['ps_mode'] == DistributedMode.GEO:
if self.context[
'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode:
self._communicator = Communicator(
trainer_config.mode, kwargs,
trainer_config.get_communicator_flags())
......@@ -1010,19 +1015,27 @@ class TheOnePSRuntime(RuntimeBase):
self.scopes = scopes
if not is_test:
if self.context['ps_mode'] == DistributedMode.GEO:
if self.context[
'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True:
self._communicator.init_params(init_params)
else:
if not self.context['use_ps_gpu']:
if role_id == 0:
print("entering self._init_all_params()")
self._init_all_params(scopes, send_ctx, dense_map)
fleet.util.barrier()
fleet.util.barrier() # 保证 0 号 worker 参数 push_dense_param over
if not self.context['use_ps_gpu']:
if self.is_heter_ps_mode == True and not self.role_maker._is_first_worker(
):
self._communicator.pull_dense(init_params)
else:
self._pull_all_dense(scopes, send_ctx, dense_map)
fleet.util.barrier()
if self.context['ps_mode'] == DistributedMode.GEO:
if self.context[
'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True:
if not self._communicator.is_running():
self._communicator.start()
else:
......@@ -1031,7 +1044,6 @@ class TheOnePSRuntime(RuntimeBase):
launch_barrier = dist_strategy.a_sync_configs["launch_barrier"]
launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1"))
if launch_barrier and launch_barrier_flag:
# for trainer wait server ready
wait_server_ready(self.role_maker._get_pserver_endpoints())
if self.is_heter_ps_mode and self.role_maker._get_next_trainers(
) != []:
......@@ -1043,12 +1055,14 @@ class TheOnePSRuntime(RuntimeBase):
next_trainers = []
if self.role_maker._get_next_trainers() != []:
next_trainers = self.role_maker._get_next_trainers()
self._heter_client = HeterClient(next_trainers,
previous_trainers,
self.role_maker._role_id())
self._heter_client = HeterClient(
next_trainers, previous_trainers,
self.role_maker._role_id()) # --> HeterClient::GetInstance
def _init_server(self, dirname=None, var_names=None, **kwargs):
server_desc = self.ps_desc_builder.build_server_desc()
#with open("test_fl_ps_server_desc", "w") as f:
# f.write(server_desc)
role_id = get_role_id(self.role_maker)
trainers = get_trainers(self.role_maker)
if self.is_heter_ps_mode:
......
......@@ -33,10 +33,9 @@ class PsProgramBuilderFactory(object):
return globals()['GeoPsProgramBuilder'](pass_ctx)
elif attrs['use_ps_gpu']:
return globals()['GpuPsProgramBuilder'](pass_ctx)
elif attrs['is_heter_ps_mode']:
elif attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']:
return globals()['HeterAsyncPsProgramBuilder'](pass_ctx)
elif 'is_fl_ps_mode' in attrs and attrs[
'is_fl_ps_mode'] == DistributedMode.FL:
elif 'is_fl_ps_mode' in attrs and attrs['is_fl_ps_mode']:
return globals()['FlPsProgramBuilder'](pass_ctx)
elif attrs['ps_mode'] == DistributedMode.SYNC:
return globals()['CpuSyncPsProgramBuilder'](pass_ctx)
......
......@@ -23,6 +23,9 @@ class PsProgramBuilder(object):
self.pass_ctx = pass_ctx
self.attrs = self.pass_ctx._attrs
self.loss = self.attrs['loss']
self.origin_startup_program = self.attrs['origin_startup_program']
self.main_program = self.attrs['origin_main_programs']
self.cloned_main = self.attrs['cloned_main']
self.cloned_startup = self.attrs['cloned_startup']
......@@ -30,6 +33,7 @@ class PsProgramBuilder(object):
self.use_heter_ps = self.attrs['is_heter_ps_mode']
self.is_worker = self.attrs['is_worker']
self.is_heter_worker = self.attrs['is_heter_worker']
self.is_server = self.attrs['is_server']
self.ps_mode = self.attrs['ps_mode']
self.launch_barrier = self.attrs['launch_barrier']
......@@ -67,9 +71,10 @@ class PsProgramBuilder(object):
def _build_programs(self):
if self.attrs['is_worker']:
logger.info("start building trainer program")
self._build_trainer_programs()
fluid.framework.switch_startup_program(self.cloned_startup)
print("fluid.default_startup_program: {}".format(
fluid.default_startup_program))
# print("ps_program_build before =", id(self.loss.block.program))
self._build_trainer_desc()
self.loss.block.program = self.cloned_main
......@@ -81,7 +86,6 @@ class PsProgramBuilder(object):
# self.loss.block.program._fleet_opt)
elif self.attrs['is_server']:
logger.info("start building pserver program")
self._build_pserver_programs()
self.loss.block.program = self.attrs['_main_server']
fluid.framework.switch_startup_program(self.attrs[
......@@ -90,7 +94,6 @@ class PsProgramBuilder(object):
class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
def __init__(self, pass_ctx):
logger.info("start building geo-ps program")
super(GeoPsProgramBuilder, self).__init__(pass_ctx)
if self.ps_mode != DistributedMode.GEO:
raise ValueError("ps mode: {} not matched {}",
......@@ -105,8 +108,6 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
if self.launch_barrier and self.launch_barrier_flag:
wait_server_ready(self.server_endpoints)
return
def _build_pserver_programs(self):
add_listen_and_serv_pass = new_pass('add_listen_and_serv_pass',
self.attrs)
......@@ -118,8 +119,6 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
class CpuSyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx)
if self.ps_mode == DistributedMode.SYNC:
logger.info("start building cpu-sync-ps program")
if self.ps_mode != DistributedMode.SYNC and self.ps_mode != DistributedMode.ASYNC:
raise ValueError("ps mode: {} not matched {}",
format(self.ps_mode, "PsProgramBuilder"))
......@@ -161,7 +160,6 @@ class CpuSyncPsProgramBuilder(PsProgramBuilder):
class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building cpu-async-ps program")
super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx)
def _build_trainer_desc(self):
......@@ -198,7 +196,6 @@ class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
class GpuPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building gpu-ps program")
super(GpuPsProgramBuilder, self).__init__(pass_ctx)
def _build_trainer_programs(self):
......@@ -231,12 +228,7 @@ class GpuPsProgramBuilder(PsProgramBuilder):
class HeterAsyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building heter-async-ps program")
super(HeterAsyncPsProgramBuilder, self).__init__(pass_ctx)
if self.use_ps_gpu or self.ps_mode == DistributedMode.GEO or self.attrs[
'is_heter_ps_mode'] == False:
raise ValueError("ps mode: {} not matched {}",
format(self.ps_mode, "HeterAsyncPsProgramBuilder"))
def _build_trainer_programs(self):
add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
......@@ -296,15 +288,91 @@ class HeterAsyncPsProgramBuilder(PsProgramBuilder):
'_startup_server'])
class FlPsProgramBuilder(PsProgramBuilder):
class FlPsProgramBuilder(HeterAsyncPsProgramBuilder):
def __init__(self, pass_ctx):
super(FlPsProgramBuilder, self).__init__(pass_ctx)
def _build_trainer_programs(self):
pass
_main_file = ps_log_root_dir + '0_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
_main_file = ps_log_root_dir + '1_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
_main_file = ps_log_root_dir + '2_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
_main_file = ps_log_root_dir + '3_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass",
self.attrs)
delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']],
[self.cloned_startup], self.pass_ctx)
_main_file = ps_log_root_dir + '4_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
_main_file = ps_log_root_dir + '5_fl_worker_main_program.prototxt'
#debug_program(_main_file, self.cloned_main)
split_trainer_ops_pass = new_pass("split_fl_ops_pass", self.attrs)
split_trainer_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
if not self.is_heter_worker:
self.part_a_program = self.pass_ctx._attrs['part_a_main_program']
self.cloned_main = self.part_a_program
_main_file = ps_log_root_dir + '8_fl_A_main_program.prototxt'
debug_program(_main_file, self.cloned_main)
else:
self.part_b_program = self.pass_ctx._attrs['part_b_main_program']
self.cloned_main = self.part_b_program
_main_file = ps_log_root_dir + '8_fl_B_main_program.prototxt'
debug_program(_main_file, self.cloned_main)
set_heter_pipeline_opt_pass = new_pass('set_heter_pipeline_opt_pass',
self.attrs)
set_heter_pipeline_opt_pass.apply([self.cloned_main],
[self.cloned_startup], self.pass_ctx)
self.attrs['origin_startup_program'] = self.cloned_startup
self.attrs['origin_main_program'] = self.cloned_main
if not self.is_heter_worker:
_main_file = ps_log_root_dir + 'final_fl_A_main_program.prototxt'
debug_program(_main_file, self.attrs['origin_main_program']
._heter_pipeline_opt['section_program'])
else:
_main_file = ps_log_root_dir + 'final_fl_B_main_program.prototxt'
debug_program(_main_file, self.attrs['origin_main_program']
._heter_pipeline_opt['section_program'])
return
def _build_pserver_programs(self):
pass
self.loss.block.program = self.attrs['_main_server']
def _build_programs(self):
pass
if not self.is_server:
self._build_trainer_programs()
fluid.framework.switch_startup_program(self.cloned_startup)
fluid.framework.switch_main_program(self.cloned_main)
print("fluid.default_startup_program: {}".format(
fluid.default_startup_program()._heter_pipeline_opt))
else:
self._build_pserver_programs()
fluid.framework.switch_startup_program(self.attrs[
'_startup_server'])
fluid.framework.switch_main_program(self.attrs['_main_server'])
......@@ -37,10 +37,12 @@ LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@"
OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName()
RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC
op_role = core.op_proto_and_checker_maker.OpRole
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched
OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize
backward = core.op_proto_and_checker_maker.OpRole.Backward
OP_DEVICE_KEY = core.op_proto_and_checker_maker.kOpDeviceAttrName()
DEVICE_LIST = ["cpu", "gpu", "xpu"]
COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"]
......@@ -91,8 +93,7 @@ class TrainerRuntimeConfig(object):
num_threads = os.getenv("CPU_NUM", "1")
send_queue_size = num_threads
k_steps = valid_strategy.a_sync_configs["k_steps"]
logger.info("ps mode in strategy: {}, {}".format(
valid_strategy.a_sync, valid_strategy.a_sync_configs["k_steps"]))
if not valid_strategy.a_sync and k_steps == 0:
self.mode = DistributedMode.SYNC
......@@ -238,17 +239,11 @@ def get_ps_endpoints(role_maker):
def get_heter_worker_endpoint(role_maker):
try:
return role_maker._get_heter_worker_endpoint()
except Exception:
return role_maker.get_heter_worker_endpoint()
def get_trainer_endpoint(role_maker):
try:
return role_maker._get_trainer_endpoint()
except Exception:
return role_maker.get_trainer_endpoint()
def get_previous_stage_trainers(role_maker):
......@@ -339,8 +334,8 @@ def get_dense_send_context(program,
var_numel += reduce(lambda x, y: x * y, var.shape)
grad_name = "Dense@GRAD_" + str(idx)
aggregate = True
print("public get_dense_send_context dense_table:", grad_name,
var_numel, origin_varnames)
# print("public get_dense_send_context dense_table:", grad_name,
# var_numel, origin_varnames)
from paddle.fluid.core import CommContext
dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id,
......@@ -362,8 +357,8 @@ def get_dense_send_context(program,
var_numel += reduce(lambda x, y: x * y, var.shape)
grad_name = "DataNorm@GRAD_" + str(idx)
aggregate = True
print("public get_dense_send_context data_norm table:", grad_name,
var_numel, origin_varnames)
# print("public get_dense_send_context data_norm table:", grad_name,
# var_numel, origin_varnames)
from paddle.fluid.core import CommContext
data_norm_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id,
......@@ -441,14 +436,15 @@ def _step_ctx(idx, role_maker):
def get_the_one_send_context(context,
split_dense_table=False,
use_origin_program=False,
split_dense_table=False,
ep_list=None):
if ep_list is None:
ep_list = ["127.0.0.1:6071"]
send_ctx = {}
trainer_id = get_role_id(context['role_maker'])
origin_programs = context['origin_main_programs']
print("is_heter_ps_mode? {}".format(split_dense_table))
idx = 0
distibuted_varnames = get_sparse_tablenames(origin_programs, True)
......@@ -471,7 +467,7 @@ def get_the_one_send_context(context,
shape = list(var.shape)
shape[0] = 0 if is_distributed else shape[0]
# print("public get_the_one_send_context sparse:", grad_name,
#print("public get_the_one_send_context sparse:", grad_name,
# splited_varname, shape)
if grad_name in send_ctx:
continue
......@@ -1094,14 +1090,13 @@ def block_append_op(program, origin_program, block, op):
else:
# for grad op
op_desc = op.desc
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
backward = core.op_proto_and_checker_maker.OpRole.Backward
device_attr_name = core.op_proto_and_checker_maker.kOpDeviceAttrName()
# append grad op
new_op_desc = block.desc.append_op()
new_op_desc.copy_from(op_desc)
new_op_desc._set_attr(op_role_attr_name, backward)
new_op_desc._set_attr(RPC_OP_ROLE_ATTR_NAME, backward)
# set device gard
if op.desc.has_attr(device_attr_name):
......@@ -1422,7 +1417,7 @@ def find_op_input_output(program, block, op):
return input_var_list, output_var_list
def add_heter_send_op(program, heter_program, block, block_var_detail):
def add_send_op(program, block, _vars):
def _get_send_op_dict():
send_op_dict = {}
send_op_list = find_send_op(program)
......@@ -1436,7 +1431,7 @@ def add_heter_send_op(program, heter_program, block, block_var_detail):
send_grad_var_list = []
send_op_dict = _get_send_op_dict()
table_dict = {}
for persistable_var in block_var_detail["backward"]["persistables"]:
for persistable_var in _vars:
if "@GRAD" not in persistable_var:
continue
if "GRAD" != persistable_var.split("@")[-1]:
......@@ -1482,6 +1477,7 @@ def get_vars_name_in_block(block):
return vars_name_list
# reserve static_var
def delete_trainer_useless_var(program, static_var):
static_var = list(set(static_var))
program_useful_var_list = []
......@@ -1525,6 +1521,67 @@ def create_backward_block(program, origin_program, bp_ops_list,
return heter_block
def is_backward_op(op):
return op_role_attr_name in op.attr_names and (
int(op.attr(op_role_attr_name)) & int(op_role.Backward))
def is_forward_op(op):
return op_role_attr_name in op.attr_names and (
int(op.attr(op_role_attr_name)) == int(op_role.Forward))
def is_push_sparse_op(op):
return op.type == 'distributed_push_sparse'
def get_distributed_push_sparse_op_list(block):
push_sparse_op_list = []
for op_idx in range(block.desc.op_size()):
op = block.ops[op_idx]
if is_push_sparse_op(op):
push_sparse_op_list.append(op)
return push_sparse_op_list
def get_bp_op_list(block):
bp_op_list = []
for op_idx in range(block.desc.op_size()):
op = block.ops[op_idx]
if is_backward_op(op):
bp_op_list.append(op)
return bp_op_list
def delete_same_ops(block, ops):
for op in ops:
try:
for origin_op in block.ops:
if str(origin_op) == str(op):
idx = list(block.ops).index(origin_op)
block._remove_op(idx)
break
except Exception as e:
print(e)
def check_program(program):
block_idx = 0
for block in program.blocks:
for op in block.ops:
input_var_names = op.desc.input_arg_names()
output_var_names = op.desc.output_arg_names()
for var_name in (input_var_names + output_var_names):
if not block._find_var_recursive(str(var_name)):
raise ValueError(
'var: {} needed by op is not found in block: {}'.format(
str(var_name), block_idx))
block_idx += 1
print('program checked valid')
def debug_program(file, program):
# py >= 3.2
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w+') as f:
f.write(str(program))
......@@ -1326,6 +1326,8 @@ class Executor(object):
use_program_cache=use_program_cache)
if isinstance(program, Program) and program._heter_pipeline_opt:
#print("program._heter_pipeline_opt: {}".format(
# program._heter_pipeline_opt))
## change default executor
heter_place = program._heter_pipeline_opt["heter_place"]
heter_place = framework._get_paddle_place(heter_place)
......@@ -1334,6 +1336,7 @@ class Executor(object):
self._default_executor = core.Executor(p)
# TODO(zhangminxu): support heterps pipeline training using exe.run
if "startup_program" in program._heter_pipeline_opt:
#print("get startup_program from _pipeline_opt")
program = program._heter_pipeline_opt["startup_program"]
if isinstance(program, Program) and \
......@@ -1391,6 +1394,7 @@ class Executor(object):
return False
compiled = isinstance(program, compiler.CompiledProgram)
# print("compiled is : {}".format(compiled))
# NOTE(zhiqiu): do not support compiled program now
if compiled:
return False
......@@ -1778,8 +1782,10 @@ class Executor(object):
dataset.set_use_var(data_vars)
elif program._heter_pipeline_opt is not None:
stage_id = program._heter_pipeline_opt["pipeline_stage"]
#print("test_fl_stage_id: {}".format(stage_id))
heter_place = program._heter_pipeline_opt["heter_place"]
if stage_id != 0:
if "is_fl_mode" not in program._heter_pipeline_opt:
import paddle
if dataset is not None:
raise RuntimeError(
......@@ -1855,10 +1861,11 @@ class Executor(object):
# warning if dataset not set psgpu in psgpu mode
if dataset.use_ps_gpu is False and trainer.proto_desc.use_ps_gpu:
logging.warning("dataset should call set_use_ps_gpu in PsGpu mode")
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
if program._heter_pipeline_opt is None:
trainer_instance = self._default_executor.init_for_dataset(
trainer_instance = self._default_executor.init_for_dataset( # -->InitForDataset
program.desc, trainer._desc(), scope, dataset.dataset)
else:
# cache trainer instance for heterps pipeline training
......@@ -1869,6 +1876,7 @@ class Executor(object):
if trainer_instance is None:
trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer._desc(), scope, dataset.dataset)
#print("test_fl_ps - trainer_desc: {}\n".format(trainer))
self._add_trainer_cache(cache_key, trainer_instance)
else:
trainer_instance.ResetDataset(dataset.dataset)
......@@ -2341,20 +2349,6 @@ class Executor(object):
fetch_info=None,
print_period=100,
fetch_handler=None):
return self._start_heter_trainer(program, scope, False, debug,
fetch_list, fetch_info, print_period,
fetch_handler)
def _start_heter_trainer(self,
program=None,
scope=None,
is_infer=False,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None):
scope, trainer = self._prepare_trainer(
program=program,
dataset=None,
......@@ -2365,7 +2359,7 @@ class Executor(object):
fetch_info=fetch_info,
print_period=print_period)
trainer._set_infer(is_infer)
trainer._set_infer(False)
trainer._gen_trainer_desc()
self._dump_debug_info(program=program, trainer=trainer)
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
feature_name = []
sparse_feature = []
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
yield list(zip(feature_name, sparse_feature))
return reader
d = CriteoDataset()
d.run_from_stdin()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
label = [int(features[0])]
feature_name = ["dense_feature"]
feature_name.append("label")
yield list(zip(feature_name, [label] + [dense_feature]))
return reader
d = CriteoDataset()
d.run_from_stdin()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
mv ./raw_data ./train_data_full
mkdir train_data && cd train_data
cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd ..
mv ./test_data ./test_data_full
mkdir test_data && cd test_data
cp ../test_data_full/part-220 ./ && cd ..
echo "Complete data download."
echo "Full Train data stored in ./train_data_full "
echo "Full Test data stored in ./test_data_full "
echo "Rapid Verification train data stored in ./train_data "
echo "Rapid Verification test data stored in ./test_data "
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# refer to PaddleRec/models/rank/dnn/benchmark.yaml
hyper_parameters:
optimizer:
class: Adam
learning_rate: 0.0001
adam_lazy_mode: True
sparse_inputs_slots: 27
sparse_feature_number: 1000001
sparse_feature_dim: 10
dense_input_dim: 13
fc_sizes: []
runner:
sync_mode: "async" # sync / async / geo / heter
is_fl_ps_mode: 1
reader_thread_num: 16
use_gpu: 0
batch_size: 2
train_files_path: "./train_data"
epoch_num: 4
model_path: "../ps_dnn_model.py"
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import os
import unittest
import numpy as np
import time
import paddle
from paddle.distributed.ps.utils.public import ps_log_root_dir, debug_program
import paddle.distributed.fleet as fleet
import paddle.fluid as fluid
def get_dataset(inputs, config, pipe_cmd, role="worker"):
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(config.get('runner.batch_size'))
reader_thread_num = int(config.get('runner.reader_thread_num'))
dataset.set_thread(reader_thread_num)
train_files_path = config.get('runner.train_files_path')
print('train_data_files:{}'.format(train_files_path))
file_list = [
os.path.join(train_files_path, x) for x in os.listdir(train_files_path)
]
if role == "worker":
file_list = fleet.util.get_file_shard(file_list)
print("worker file list: {}".format(file_list))
elif role == "heter_worker":
file_list = fleet.util.get_heter_file_shard(file_list)
print("heter worker file list: {}".format(file_list))
return dataset, file_list
def fl_ps_train():
# 0. get role
import paddle.distributed.fleet.base.role_maker as role_maker
role_maker = role_maker.PaddleCloudRoleMaker()
role_maker._generate_role()
fleet.util._set_role_maker(role_maker)
# 1. load yaml-config to dict-config
from ps_dnn_trainer import YamlHelper, StaticModel, get_user_defined_strategy
yaml_helper = YamlHelper()
config_yaml_path = '../ps/fl_async_ps_config.yaml'
config = yaml_helper.load_yaml(config_yaml_path)
#yaml_helper.print_yaml(config)
# 2. get static model
paddle.enable_static()
model = StaticModel(config)
feeds_list = model.create_feeds()
metrics = model.fl_net(feeds_list)
loss = model._cost
# 3. compile time - build program_desc
user_defined_strategy = get_user_defined_strategy(config)
a_sync_configs = user_defined_strategy.a_sync_configs
a_sync_configs["launch_barrier"] = True
user_defined_strategy.a_sync_configs = a_sync_configs
print("launch_barrier: ",
user_defined_strategy.a_sync_configs["launch_barrier"])
learning_rate = config.get("hyper_parameters.optimizer.learning_rate")
inner_optimizer = paddle.optimizer.Adam(learning_rate, lazy_mode=True)
from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer
ps_optimizer = ParameterServerOptimizer(inner_optimizer)
ps_optimizer._set_basic_info(loss, role_maker, inner_optimizer,
user_defined_strategy)
ps_optimizer.minimize_impl(loss)
# 4. runtime
from paddle.distributed.ps.the_one_ps import TheOnePSRuntime
_runtime_handle = TheOnePSRuntime() # ps 目录下重构版的 TheOnePSRuntime
_runtime_handle._set_basic_info(ps_optimizer.pass_ctx._attrs)
epoch_num = int(config.get('runner.epoch_num'))
# 4.1 run server - build fleet_desc
if role_maker._is_server():
_runtime_handle._init_server()
_runtime_handle._run_server()
# 4.2 run worker
elif role_maker._is_worker():
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
_runtime_handle._init_worker()
print('trainer get dataset')
inputs = feeds_list[1:-1]
dataset, file_list = get_dataset(inputs, config,
"python dataset_generator_A.py")
print("fluid.default_main_program: {}".format(
fluid.default_main_program()._heter_pipeline_opt))
for epoch in range(epoch_num):
# A 方和 B 方如果要以文件粒度 shuffle 时,则需要固定同一个种子
dataset.set_filelist(file_list)
start_time = time.time()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
print_period=2,
debug=False)
end_time = time.time()
print("trainer epoch %d finished, use time=%d\n" % (
(epoch), end_time - start_time))
exe.close()
_runtime_handle._stop_worker()
print("Fl partyA Trainer Success!")
else:
exe = fluid.Executor()
exe.run(fluid.default_startup_program())
_runtime_handle._init_worker()
inputs = [feeds_list[0],
feeds_list[-1]] # 顺序务必要和 dataset_generator_B.py 中保持一致
dataset, file_list = get_dataset(
inputs, config, "python dataset_generator_B.py", "heter_worker")
print("fluid.default_main_program: {}".format(
fluid.default_main_program()._heter_pipeline_opt))
for epoch in range(epoch_num):
dataset.set_filelist(file_list)
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
print_period=2,
debug=False)
exe.close()
_runtime_handle._stop_worker()
print("Fl partB Trainer Success!")
if __name__ == '__main__':
fl_ps_train()
......@@ -35,7 +35,7 @@ sys.path.append(os.path.abspath(os.path.join(__dir__, '..')))
def is_distributed_env():
node_role = os.getenv("TRAINING_ROLE")
logger.info("-- Role: {} --".format(node_role))
print("-- Role: {} --".format(node_role))
if node_role is None:
return False
else:
......@@ -167,6 +167,14 @@ def get_user_defined_strategy(config):
elif sync_mode == "async":
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.is_fl_ps_mode = True if config.get(
"runner.is_fl_ps_mode") == 1 else False
if strategy.is_fl_ps_mode == True:
strategy.pipeline = False
micro_num = 1
strategy.pipeline_configs = {
"accumulate_steps": micro_num
} ## num_microbatches
elif sync_mode == "geo":
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
......@@ -215,13 +223,14 @@ def get_user_defined_strategy(config):
print("strategy table config:", strategy.sparse_table_configs)
a_sync_configs = strategy.a_sync_configs
a_sync_configs["launch_barrier"] = False
# a_sync_configs["launch_barrier"] = True
strategy.a_sync_configs = a_sync_configs
print("launch_barrier: ", strategy.a_sync_configs["launch_barrier"])
return strategy
def get_distributed_strategy(user_defined_strategy):
def get_distributed_strategy(user_defined_strategy): # pslib
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
k_steps = user_defined_strategy.a_sync_configs["k_steps"]
......@@ -318,14 +327,14 @@ class DnnTrainer(object):
fleet.init()
if fleet.is_server():
logger.info("server: {} started".format(fleet.server_index()))
print("server: {} started".format(fleet.server_index()))
else:
logger.info("worker: {} started".format(fleet.worker_index()))
print("worker: {} started".format(fleet.worker_index()))
def run_minimize(self):
self.init_fleet_with_gloo()
self.model = get_model(self.config)
logger.info("cpu_num: {}".format(os.getenv("CPU_NUM")))
print("cpu_num: {}".format(os.getenv("CPU_NUM")))
self.input_data = self.model.create_feeds()
self.metrics = self.model.net(self.input_data)
loss = self.model._cost
......@@ -337,14 +346,14 @@ class DnnTrainer(object):
self.role_maker._generate_role() # 必要
if self.config['debug_new_minimize'] == 1:
logger.info("entering run_minimize -- new")
print("entering run_minimize -- new")
from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer
ps_optimizer = ParameterServerOptimizer(inner_optimizer)
ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer,
user_defined_strategy)
ps_optimizer.minimize_impl(loss)
else:
logger.info("entering run_minimize -- old")
print("entering run_minimize -- old")
fleet_obj = fleet.distributed_optimizer(
inner_optimizer, user_defined_strategy) ## Fleet 对象
fleet_obj.minimize(loss)
......@@ -376,7 +385,7 @@ class DnnTrainer(object):
startup_program = paddle.static.default_startup_program()
inner_optimizer.minimize(loss, startup_program)
if self.config['debug_new_pass'] == 1:
logger.info("entering run {} - new".format(
print("entering run {} - new".format(
str(config["applied_pass_name"])))
from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer
ps_optimizer = ParameterServerOptimizer(inner_optimizer)
......@@ -390,7 +399,7 @@ class DnnTrainer(object):
ps_optimizer.pass_ctx._attrs)
append_send_ops_pass.apply([_main], [None], ps_optimizer.pass_ctx)
else:
logger.info("entering run {} - old".format(
print("entering run {} - old".format(
str(config["applied_pass_name"])))
from paddle.fluid.incubate.fleet.parameter_server.ir import public as public
dist_strategy = get_distributed_strategy(user_defined_strategy)
......@@ -428,7 +437,7 @@ class DnnTrainer(object):
self.role_maker._generate_role() # 必要
if self.config['debug_the_one_ps'] == 1:
logger.info("entering run_the_one_ps -- new")
print("entering run_the_one_ps -- new")
from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer
ps_optimizer = ParameterServerOptimizer(inner_optimizer)
......@@ -455,7 +464,7 @@ class DnnTrainer(object):
else:
pass
'''
logger.info("entering run_the_one_ps -- old")
print("entering run_the_one_ps -- old")
fleet_obj = fleet.distributed_optimizer(
inner_optimizer, user_defined_strategy)
fleet_obj.minimize(loss)
......@@ -486,7 +495,7 @@ class DnnTrainer(object):
if __name__ == "__main__":
paddle.enable_static()
config = parse_args()
logger.info(">>>>>>>>>> python process started")
print(">>>>>>>>>> python process started")
os.environ["CPU_NUM"] = str(config.get("runner.thread_num"))
benchmark_main = DnnTrainer(config)
if config['run_single_pass'] == 1:
......
#!/bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import shlex
from paddle.fluid.tests.unittests.distributed_passes.dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists
import os
class FlPsTest(unittest.TestCase):
def test_launch_fl_ps(self):
pass
'''
cmd = [
'python', '-m', 'paddle.distributed.fleet.launch', '--log_dir',
'/ps_log/fl_ps', '--servers', "127.0.0.1:8070", '--workers',
"127.0.0.1:8080,127.0.0.1:8081", '--heter_workers',
"127.0.0.1:8090,127.0.0.1:8091", '--heter_devices', "cpu",
'--worker_num', "2", '--heter_worker_num', "2", 'fl_ps_trainer.py'
]
cmd = [shlex.quote(c) for c in cmd]
prepare_python_path_and_return_module(__file__)
exitcode = os.system(' '.join(cmd))
'''
if __name__ == '__main__':
remove_path_if_exists('/ps_log')
remove_path_if_exists('/ps_usr_print_log')
if not os.path.exists('./train_data'):
os.system('sh download_data.sh')
os.system('rm -rf ctr_data.tar.gz')
os.sysyem('rm -rf train_data_full')
os.sysyem('rm -rf test_data_full')
unittest.main()
if os.path.exists('./train_data'):
os.system('rm -rf train_data')
os.system('rm -rf test_data')
......@@ -17,7 +17,6 @@ import paddle.nn as nn
import paddle.nn.functional as F
import math
import paddle.distributed.fleet as fleet
from paddle.distributed.ps.utils.public import logger
class DNNLayer(nn.Layer):
......@@ -90,6 +89,154 @@ class DNNLayer(nn.Layer):
return y_dnn
class FlDNNLayer(nn.Layer):
def __init__(self,
sparse_feature_number,
sparse_feature_dim,
dense_feature_dim,
sparse_number,
sync_mode=None):
super(FlDNNLayer, self).__init__()
self.PART_A_DEVICE_FlAG = 'gpu:0'
self.PART_A_JOINT_OP_DEVICE_FlAG = 'gpu:2'
self.PART_B_DEVICE_FlAG = 'gpu:1'
self.PART_B_JOINT_OP_DEVICE_FlAG = 'gpu:3'
self.sync_mode = sync_mode
self.sparse_feature_number = sparse_feature_number
self.sparse_feature_dim = sparse_feature_dim
self.slot_num = sparse_number
self.dense_feature_dim = dense_feature_dim
layer_sizes_a = [self.slot_num * self.sparse_feature_dim, 5,
7] # for test
layer_sizes_b = [self.dense_feature_dim, 6, 7]
layer_sizes_top = [7, 2]
self.embedding = paddle.nn.Embedding(
self.sparse_feature_number,
self.sparse_feature_dim,
sparse=True,
weight_attr=paddle.ParamAttr(
name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))
# part_a fc
acts = ["relu" for _ in range(len(layer_sizes_a))]
self._mlp_layers_a = []
for i in range(len(layer_sizes_a) - 1):
linear = paddle.nn.Linear(
in_features=layer_sizes_a[i],
out_features=layer_sizes_a[i + 1],
weight_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.Normal(
std=1.0 / math.sqrt(layer_sizes_a[i]))))
self.add_sublayer('linear_%d' % i, linear)
self._mlp_layers_a.append(linear)
act = paddle.nn.ReLU()
self.add_sublayer('act_%d' % i, act)
self._mlp_layers_a.append(act)
# part_b fc
acts = ["relu" for _ in range(len(layer_sizes_b))]
self._mlp_layers_b = []
for i in range(len(layer_sizes_b) - 1):
linear = paddle.nn.Linear(
in_features=layer_sizes_b[i],
out_features=layer_sizes_b[i + 1],
weight_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.Normal(
std=1.0 / math.sqrt(layer_sizes_b[i]))))
self.add_sublayer('linear_%d' % i, linear)
self._mlp_layers_b.append(linear)
act = paddle.nn.ReLU()
self.add_sublayer('act_%d' % i, act)
self._mlp_layers_b.append(act)
# top fc
acts = ["relu" for _ in range(len(layer_sizes_top))]
self._mlp_layers_top = []
for i in range(len(layer_sizes_top) - 1):
linear = paddle.nn.Linear(
in_features=layer_sizes_top[i],
out_features=layer_sizes_top[i + 1],
weight_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.Normal(
std=1.0 / math.sqrt(layer_sizes_top[i]))))
self.add_sublayer('linear_%d' % i, linear)
self._mlp_layers_top.append(linear)
act = paddle.nn.ReLU()
self.add_sublayer('act_%d' % i, act)
self._mlp_layers_top.append(act)
def bottom_a_layer(self, sparse_inputs):
with paddle.fluid.device_guard(self.PART_A_DEVICE_FlAG):
sparse_embs = []
for s_input in sparse_inputs:
emb = self.embedding(s_input)
emb = paddle.reshape(emb, shape=[-1, self.sparse_feature_dim])
sparse_embs.append(emb)
y = paddle.concat(x=sparse_embs, axis=1)
y = self._mlp_layers_a[0](y)
y = self._mlp_layers_a[1](y)
y = self._mlp_layers_a[2](y)
with paddle.fluid.device_guard(
self.PART_A_JOINT_OP_DEVICE_FlAG): # joint point
bottom_a = self._mlp_layers_a[3](y)
return bottom_a
def bottom_b_layer(self, dense_inputs):
with paddle.fluid.device_guard(self.PART_B_DEVICE_FlAG):
y = self._mlp_layers_b[0](dense_inputs)
y = self._mlp_layers_b[1](y)
y = self._mlp_layers_b[2](y)
bottom_b = self._mlp_layers_b[3](y)
return bottom_b
def interactive_layer(self, bottom_a, bottom_b):
with paddle.fluid.device_guard(
self.PART_B_JOINT_OP_DEVICE_FlAG): # joint point
interactive = paddle.fluid.layers.elementwise_add(bottom_a,
bottom_b)
return interactive
def top_layer(self, interactive, label_input):
with paddle.fluid.device_guard(self.PART_B_DEVICE_FlAG):
y = self._mlp_layers_top[0](interactive)
y_top = self._mlp_layers_top[1](y)
predict_2d = paddle.nn.functional.softmax(y_top)
auc, batch_auc, [
self.batch_stat_pos, self.batch_stat_neg, self.stat_pos,
self.stat_neg
] = paddle.static.auc(input=predict_2d,
label=label_input,
num_thresholds=2**12,
slide_steps=20)
cost = paddle.nn.functional.cross_entropy(
input=y_top, label=label_input)
avg_cost = paddle.mean(x=cost)
return auc, avg_cost
def forward(self, sparse_inputs, dense_inputs, label_input):
bottom_a = self.bottom_a_layer(sparse_inputs)
bottom_b = self.bottom_b_layer(dense_inputs)
interactive = self.interactive_layer(bottom_a, bottom_b)
auc, avg_cost = self.top_layer(interactive, label_input)
return auc, avg_cost
class StaticModel():
def __init__(self, config):
self.cost = None
......@@ -147,13 +294,9 @@ class StaticModel():
sparse_number,
self.fc_sizes,
sync_mode=self.sync_mode)
raw_predict_2d = dnn_model.forward(self.sparse_inputs, self.dense_input)
predict_2d = paddle.nn.functional.softmax(raw_predict_2d)
self.predict = predict_2d
auc, batch_auc, [
self.batch_stat_pos, self.batch_stat_neg, self.stat_pos,
self.stat_neg
......@@ -173,3 +316,22 @@ class StaticModel():
fetch_dict = {'cost': avg_cost, 'auc': auc}
return fetch_dict
def fl_net(self, input, is_infer=False):
self.label_input = input[0]
self.sparse_inputs = input[1:self.sparse_inputs_slots]
self.dense_input = input[-1]
self.sparse_number = self.sparse_inputs_slots - 1
fl_dnn_model = FlDNNLayer(
self.sparse_feature_number,
self.sparse_feature_dim,
self.dense_input_dim,
self.sparse_number,
sync_mode=self.sync_mode)
auc, avg_cost = fl_dnn_model.forward(self.sparse_inputs,
self.dense_input, self.label_input)
fetch_dict = {'cost': avg_cost, 'auc': auc}
self._cost = avg_cost
return fetch_dict
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册