From 8bb1038cedcb656be305b44b220c908e6a3155f3 Mon Sep 17 00:00:00 2001 From: zmx Date: Thu, 25 Nov 2021 16:10:04 +0800 Subject: [PATCH] [cherry-pick 2.2 heterps]bug fix for launch_utils.py (#37521) * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * [heterps]bug fix for _run_from_dataset * fix heter_server.cc * fix launch_utils.py * fix heter_section_worker.cc * fix. test=develop * fix. test=develop --- .../fluid/distributed/service/heter_server.cc | 2 +- .../fluid/framework/heter_section_worker.cc | 2 +- .../paddle/distributed/fleet/launch_utils.py | 19 ++++++++++--------- python/paddle/fluid/executor.py | 8 ++------ 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/distributed/service/heter_server.cc b/paddle/fluid/distributed/service/heter_server.cc index 035668f2bc7..fee3081f032 100644 --- a/paddle/fluid/distributed/service/heter_server.cc +++ b/paddle/fluid/distributed/service/heter_server.cc @@ -43,11 +43,11 @@ void HeterServer::StartHeterService() { { std::lock_guard lock(this->mutex_ready_); + stoped_ = false; ready_ = 1; } condition_ready_.notify_all(); std::unique_lock running_lock(mutex_); - stoped_ = false; cv_.wait(running_lock, [&] { VLOG(1) << "Heter Server is Stop? " << stoped_; return stoped_; diff --git a/paddle/fluid/framework/heter_section_worker.cc b/paddle/fluid/framework/heter_section_worker.cc index ace6ac49255..a8db38f8077 100644 --- a/paddle/fluid/framework/heter_section_worker.cc +++ b/paddle/fluid/framework/heter_section_worker.cc @@ -350,7 +350,7 @@ void HeterSectionWorker::BatchPostProcess() { DumpParam(*((*microbatch_scopes_)[0]), batch_num_); } // print each op time - if (thread_id_ == 0) { + if (debug_ && thread_id_ == 0) { size_t total_ops_size = forward_ops_.size() + backward_ops_.size(); if (batch_num_ > 0 && batch_num_ % 100 == 0) { for (size_t i = 0; i < total_ops_size; ++i) { diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index f0a252b34b5..f7f50e76af6 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -1088,8 +1088,6 @@ class ParameterServerLauncher(object): heter_worker_endpoints_list = args.heter_workers.split(";") self.heter_worker_endpoints = "" for i in range(len(heter_worker_endpoints_list)): - if self.heter_worker_endpoints != "": - self.heter_worker_endpoints += "," heter_worker_endpoints = heter_worker_endpoints_list[ i].split(",") self.stage_heter_trainer_num.append( @@ -1182,15 +1180,18 @@ class ParameterServerLauncher(object): _, self.current_node_ip = get_host_name_ip() else: self.current_node_ip = pod_ip - assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ - % (self.current_node_ip, self.node_ips) - self.node_rank = self.node_ips.index(self.current_node_ip) - - logger.debug( - "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". - format(self.node_ips, self.current_node_ip, self.node_rank)) + if not self.distribute_mode == DistributeMode.PS_HETER: + assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ + % (self.current_node_ip, self.node_ips) + if self.current_node_ip in self.node_ips: + self.node_rank = self.node_ips.index(self.current_node_ip) + logger.debug( + "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}". + format(self.node_ips, self.current_node_ip, self.node_rank)) def start_ps(self): + if not self.current_node_ip in self.node_ips: + return cluster = Cluster(hdfs=None) server_rank = 0 worker_rank = 0 diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1ed52e52fa6..da153b285e9 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1737,12 +1737,8 @@ class Executor(object): for var in program.global_block().vars.values(): if var.is_data: data_vars.append(var) - if core.is_compiled_with_npu(): - dataset = paddle.fluid.DatasetFactory().create_dataset( - 'InMemoryDataset') - else: - dataset = paddle.fluid.DatasetFactory().create_dataset( - 'FileInstantDataset') + dataset = paddle.fluid.DatasetFactory().create_dataset( + 'InMemoryDataset') dataset.set_batch_size(1) dataset.set_thread(1) dataset.set_filelist(['None']) -- GitLab