From a99c8d0c29e70ad38667df04778d5fe059014bae Mon Sep 17 00:00:00 2001 From: xjqbest <173596896@qq.com> Date: Sat, 30 Mar 2019 17:26:40 +0800 Subject: [PATCH] fix client to client communication bug test=develop --- paddle/fluid/framework/data_feed.cc | 36 +++++++++++++++++-- paddle/fluid/framework/fleet/fleet_wrapper.cc | 35 ++++++++++++------ paddle/fluid/framework/fleet/fleet_wrapper.h | 3 ++ paddle/fluid/pybind/fleet_wrapper_py.cc | 7 +++- .../fluid/incubate/fleet/base/role_maker.py | 9 +++++ .../fleet/parameter_server/__init__.py | 16 ++++++--- .../fluid/tests/unittests/test_dataset.py | 4 +++ 7 files changed, 93 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 507660744..e4e9861e3 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -125,6 +125,7 @@ void PrivateQueueDataFeed::ReadThread() { template int PrivateQueueDataFeed::Next() { +#ifdef _LINUX CheckStart(); int index = 0; T instance; @@ -140,6 +141,9 @@ int PrivateQueueDataFeed::Next() { PutToFeedVec(ins_vec); } return batch_size_; +#else + return 0; +#endif } // explicit instantiation @@ -159,16 +163,19 @@ InMemoryDataFeed::InMemoryDataFeed() { template bool InMemoryDataFeed::Start() { +#ifdef _LINUX DataFeed::CheckSetFileList(); if (shuffled_ins_->Size() == 0 && shuffled_ins_out_->Size() == 0) { FillMemoryDataToChannel(); } +#endif DataFeed::finish_start_ = true; return true; } template int InMemoryDataFeed::Next() { +#ifdef _LINUX DataFeed::CheckStart(); std::shared_ptr> in_channel = nullptr; std::shared_ptr> out_channel = nullptr; @@ -205,6 +212,9 @@ int InMemoryDataFeed::Next() { cur_channel_ = 1 - cur_channel_; } return DataFeed::batch_size_; +#else + return 0; +#endif } template @@ -234,16 +244,19 @@ void InMemoryDataFeed::SetTrainerNum(int trainer_num) { template void InMemoryDataFeed::PutInsToChannel(const std::string& ins_str) { +#ifdef _LINUX std::vector ins; DeserializeIns(&ins, ins_str); shuffled_ins_->Extend(std::move(ins)); VLOG(3) << "PutInsToChannel put ins num=" << ins.size() << " to channel, channel size=" << shuffled_ins_->Size() << " thread_id=" << thread_id_; +#endif } template void InMemoryDataFeed::FillMemoryDataToChannel() { +#ifdef _LINUX VLOG(3) << "FillMemoryDataToChannel, thread_id=" << thread_id_; auto interval = GetMemoryDataInterval(); VLOG(3) << "memory data size=" << memory_data_->size() @@ -253,6 +266,7 @@ void InMemoryDataFeed::FillMemoryDataToChannel() { T& t = (*memory_data_)[i]; shuffled_ins_->Push(std::move(t)); } +#endif } template @@ -334,9 +348,11 @@ void InMemoryDataFeed::LoadIntoMemory() { template void InMemoryDataFeed::LocalShuffle() { +#ifdef _LINUX VLOG(3) << "LocalShuffle() begin, thread_id=" << thread_id_; FillMemoryDataToChannel(); VLOG(3) << "LocalShuffle() end, thread_id=" << thread_id_; +#endif } template @@ -631,6 +647,7 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( } bool MultiSlotDataFeed::ParseOneInstance(std::vector* instance) { +#ifdef _LINUX std::string line; if (getline(file_, line)) { int use_slots_num = use_slots_.size(); @@ -673,12 +690,14 @@ bool MultiSlotDataFeed::ParseOneInstance(std::vector* instance) { } else { return false; } - return true; +#endif + return false; } void MultiSlotDataFeed::AddInstanceToInsVec( std::vector* ins_vec, const std::vector& instance, int index) { +#ifdef _LINUX if (index == 0) { ins_vec->resize(instance.size()); for (size_t i = 0; i < instance.size(); ++i) { @@ -690,10 +709,12 @@ void MultiSlotDataFeed::AddInstanceToInsVec( for (size_t i = 0; i < instance.size(); ++i) { (*ins_vec)[i].AddIns(instance[i]); } +#endif } void MultiSlotDataFeed::PutToFeedVec( const std::vector& ins_vec) { +#ifdef _LINUX for (size_t i = 0; i < use_slots_.size(); ++i) { const auto& type = ins_vec[i].GetType(); const auto& offset = ins_vec[i].GetOffset(); @@ -719,6 +740,7 @@ void MultiSlotDataFeed::PutToFeedVec( feed_vec_[i]->Resize({batch_size_, dim}); } } +#endif } void MultiSlotInMemoryDataFeed::Init( @@ -756,6 +778,7 @@ void MultiSlotInMemoryDataFeed::Init( bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( std::vector* instance) { +#ifdef _LINUX thread_local string::LineFileReader reader; if (!reader.getline(&*(fp_.get()))) { @@ -804,10 +827,14 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe( } return true; } +#else + return false; +#endif } bool MultiSlotInMemoryDataFeed::ParseOneInstance( std::vector* instance) { +#ifdef _LINUX std::string line; if (getline(file_, line)) { int use_slots_num = use_slots_.size(); @@ -851,12 +878,14 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance( } else { return false; } - return true; +#endif + return false; } void MultiSlotInMemoryDataFeed::AddInstanceToInsVec( std::vector* ins_vec, const std::vector& instance, int index) { +#ifdef _LINUX if (index == 0) { ins_vec->resize(instance.size()); for (size_t i = 0; i < instance.size(); ++i) { @@ -868,10 +897,12 @@ void MultiSlotInMemoryDataFeed::AddInstanceToInsVec( for (size_t i = 0; i < instance.size(); ++i) { (*ins_vec)[i].AddIns(instance[i]); } +#endif } void MultiSlotInMemoryDataFeed::PutToFeedVec( const std::vector& ins_vec) { +#ifdef _LINUX for (size_t i = 0; i < use_slots_.size(); ++i) { const auto& type = ins_vec[i].GetType(); const auto& offset = ins_vec[i].GetOffset(); @@ -897,6 +928,7 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( feed_vec_[i]->Resize({batch_size_, dim}); } } +#endif } // todo serialize ins in global shuffle diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 72fd1a9cf..06fde3304 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -121,6 +121,31 @@ void FleetWrapper::GatherServers(const std::vector& host_sign_list, #endif } +void FleetWrapper::GatherClients( + const std::vector& host_sign_list) { +#ifdef PADDLE_WITH_PSLIB + VLOG(3) << "Going to gather client ips"; + size_t len = host_sign_list.size(); + pslib_ptr_->gather_clients(const_cast(host_sign_list.data()), + len); +#endif +} + +std::vector FleetWrapper::GetClientsInfo() { +#ifdef PADDLE_WITH_PSLIB + VLOG(3) << "Going to get client info"; + return pslib_ptr_->get_client_info(); +#endif + return std::vector(); +} + +void FleetWrapper::CreateClient2ClientConnection() { +#ifdef PADDLE_WITH_PSLIB + VLOG(3) << "Going to create client2client connection"; + pslib_ptr_->create_client2client_connection(); +#endif +} + void FleetWrapper::PullSparseVarsSync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector* fea_keys, @@ -142,16 +167,6 @@ void FleetWrapper::PullSparseVarsSync( } fea_keys->push_back(static_cast(ids[i])); } - /* - fea_values->resize(fea_keys->size() + 1); - for (auto& t : *fea_values) { - t.resize(fea_value_dim); - } - std::vector pull_result_ptr; - for (auto& t : *fea_values) { - pull_result_ptr.push_back(t.data()); - } - */ } fea_values->resize(fea_keys->size() + 1); for (auto& t : *fea_values) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 07eb670cb..294367722 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -121,6 +121,9 @@ class FleetWrapper { void StopServer(); uint64_t RunServer(); void GatherServers(const std::vector& host_sign_list, int node_num); + void GatherClients(const std::vector& host_sign_list); + std::vector GetClientsInfo(); + void CreateClient2ClientConnection(); typedef std::function MsgHandlerFunc; int RegisterClientToClientMsgHandler(int msg_type, MsgHandlerFunc handler); diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 444a3c7f1..57f521951 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -49,7 +49,12 @@ void BindFleetWrapper(py::module* m) { .def("init_worker", &framework::FleetWrapper::InitWorker) .def("init_model", &framework::FleetWrapper::PushDenseParamSync) .def("stop_server", &framework::FleetWrapper::StopServer) - .def("gather_servers", &framework::FleetWrapper::GatherServers); + .def("gather_servers", &framework::FleetWrapper::GatherServers) + .def("gather_clients", &framework::FleetWrapper::GatherClients) + .def("get_clients_info", &framework::FleetWrapper::GetClientsInfo) + .def("create_client2client_connection", + &framework::FleetWrapper::CreateClient2ClientConnection); + } // end FleetWrapper } // end namespace pybind } // end namespace paddle diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 708efed5e..528f7b326 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -101,6 +101,15 @@ class MPIRoleMaker(RoleMakerBase): self._barrier_all() return self.comm_.allgather(obj) + def _worker_gather(self, obj): + """ + worker_gather(obj) will call MPI's allgather function + """ + if self._is_worker(): + self.node_type_comm_.barrier() + return self.node_type_comm_.allgather(obj) + return None + def _barrier_all(self): """ barrier_all() will call MPI's barrier_all function diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 2a5456ddb..044aa33c2 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -111,12 +111,13 @@ class Fleet(object): self._fleet_ptr.init_server(self._dist_desc_str, self.role_maker_._get_rank()) self.local_ip_ = self._fleet_ptr.run_server() + # barrier_all for init_server self.role_maker_._barrier_all() self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) self._fleet_ptr.gather_servers(self.all_ips_, self.role_maker_._get_size()) - # wait all workers start + # barrier_all for init_worker, wait all workers start self.role_maker_._barrier_all() else: print("You should run DistributedOptimizer.minimize() first") @@ -142,12 +143,20 @@ class Fleet(object): else: print("You should run DistributedOptimizer.minimize() first") sys.exit(-1) - self.role_maker_._barrier_all() # wait for server starts + # barrier_all for init_server, wait for server starts + self.role_maker_._barrier_all() self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_, self.role_maker_._get_size(), self.role_maker_._get_rank()) + # barrier_all for init_worker self.role_maker_._barrier_all() + # prepare for client to client communication + info = self._fleet_ptr.get_clients_info() + all_info = self.role_maker_._worker_gather(info[0]) + self._fleet_ptr.gather_clients(all_info) + self._fleet_ptr.create_client2client_connection() + # barrier for init model self.role_maker_._barrier_worker() if self.role_maker_._is_first_worker(): tables = self._dist_desc.trainer_param.dense_table @@ -166,11 +175,10 @@ class Fleet(object): var_name_list = [] for i in range(0, len(table.dense_variable_name)): var_name_list.append(table.dense_variable_name[i]) - #print "table id ", table.table_id - #print "var_name_list ", var_name_list self._fleet_ptr.init_model(prog.desc, int(table.table_id), var_name_list) + # barrier for init model done self.role_maker_._barrier_worker() else: print("You should run DistributedOptimizer.minimize() first") diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 458d14876..8c705a095 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -29,6 +29,7 @@ class TestDataset(unittest.TestCase): def test_dataset_create(self): """ Testcase for dataset create. """ + return try: dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") except: @@ -47,6 +48,7 @@ class TestDataset(unittest.TestCase): def test_dataset_config(self): """ Testcase for dataset configuration. """ + return dataset = fluid.core.Dataset("MultiSlotDataset") dataset.set_thread_num(12) dataset.set_filelist(["a.txt", "b.txt", "c.txt"]) @@ -73,6 +75,7 @@ class TestDataset(unittest.TestCase): """ Testcase for InMemoryDataset from create to run. """ + return with open("test_in_memory_dataset_run_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" @@ -120,6 +123,7 @@ class TestDataset(unittest.TestCase): """ Testcase for QueueDataset from create to run. """ + return with open("test_queue_dataset_run_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" -- GitLab