diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 3862b23e2d5569964e9610146099454257f5b423..3038719539251cd6e5aec5692aab2b695b4212cd 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -168,6 +168,7 @@ class DeviceWorker { virtual void CacheProgram(const ProgramDesc& main_program) {} virtual void ProduceTasks() {} virtual void GetXpuOpIndex() {} + virtual void Schedule(int taskid) {} #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) virtual void SetStream(const gpuStream_t stream) {} virtual void SetEvent(const gpuEvent_t event) {} diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 7b3f03c6f5f135e62e8f95b2480cbbf2170654c7..4c8681aad2bb1cd9c8871868263e73a55cecc9eb 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -62,9 +62,8 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc, void DistMultiTrainer::RegisterHeterCallback() { auto fleet_ptr = FleetWrapper::GetInstance(); - fleet_ptr->RegisterHeterCallback([this](int worker, int taskid) { - // workers_[worker]->Schedule(taskid); - }); + fleet_ptr->RegisterHeterCallback( + [this](int worker, int taskid) { workers_[worker]->Schedule(taskid); }); } void DistMultiTrainer::InitDumpEnv() { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 0c0792a95cd70c196a5519544b53b3400e855c13..3cd8b55026e5189f46423163985cf18e4e4fcdad 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -193,7 +193,6 @@ void FleetWrapper::HeterPullSparseVars( for (auto& t : fea_values) { pull_result_ptr.push_back(t.data()); } - /* auto status = pslib_ptr_->_worker_ptr->heter_pull_sparse( workerid, pull_result_ptr.data(), table_id, fea_keys.data(), fea_keys.size(), task->taskid_); @@ -207,7 +206,6 @@ void FleetWrapper::HeterPullSparseVars( exit(-1); } } - */ } void FleetWrapper::HeterPushSparseVars( diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index e3c417d4a62573a7340736c8510b467a4c3419cc..d3737e742b478336bcaf75f46e26a79a0bf053a3 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -1039,11 +1039,17 @@ class HeterRoleMaker(GeneralRoleMaker): self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] gloo = fluid.core.Gloo() - gloo.init(current_id, - len(worker_endpoints), - self._hdfs_path.rstrip("/") + "/trainer", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + + gloo.set_rank(current_id) + gloo.set_size(len(worker_endpoints)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store( + self._hdfs_path.rstrip("/") + "/trainer", self._hdfs_name, + self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo elif training_role == "XPU": role = Role.XPU @@ -1051,10 +1057,17 @@ class HeterRoleMaker(GeneralRoleMaker): self._node_type = 2 self._cur_endpoint = xpu_endpoints[current_id] gloo = fluid.core.Gloo() - gloo.init(current_id, - len(xpu_endpoints), - self._hdfs_path.rstrip("/") + "/xpu", self._hdfs_name, - self._hdfs_ugi, self._iface, self._prefix) + + gloo.set_rank(current_id) + gloo.set_size(len(xpu_endpoints)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store( + self._hdfs_path.rstrip("/") + "/xpu", self._hdfs_name, + self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo elif training_role == "PSERVER": role = Role.SERVER @@ -1070,30 +1083,47 @@ class HeterRoleMaker(GeneralRoleMaker): self._node_type = 0 self._cur_endpoint = cur_endpoint gloo = fluid.core.Gloo() - gloo.init(current_id, - len(eplist), - self._hdfs_path.rstrip("/") + "/pserver", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + gloo.set_rank(current_id) + gloo.set_size(len(eplist)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store( + self._hdfs_path.rstrip("/") + "/pserver", self._hdfs_name, + self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo if training_role == "TRAINER" or training_role == "XPU": gloo = fluid.core.Gloo() heter_list = worker_endpoints + xpu_endpoints - gloo.init( - heter_list.index(self._cur_endpoint), - len(heter_list), + + gloo.set_rank(heter_list.index(self._cur_endpoint)) + gloo.set_size(len(heter_list)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store( self._hdfs_path.rstrip("/") + "/heter", self._hdfs_name, - self._hdfs_ugi, self._iface, self._prefix) + self._hdfs_ugi) + gloo.init() self._heter_comm = gloo gloo = fluid.core.Gloo() all_list = worker_endpoints + eplist + xpu_endpoints - gloo.init( - all_list.index(self._cur_endpoint), - len(all_list), + + gloo.set_rank(all_list.index(self._cur_endpoint)) + gloo.set_size(len(all_list)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + gloo.set_hdfs_store( self._hdfs_path.rstrip("/") + "/all", self._hdfs_name, - self._hdfs_ugi, self._iface, self._prefix) + self._hdfs_ugi) + gloo.init() self._all_comm = gloo self._trainers_num = trainers_num