From 6a57e8075a6968c4102a8c298510931a1ea73686 Mon Sep 17 00:00:00 2001 From: xjqbest <173596896@qq.com> Date: Thu, 4 Apr 2019 20:22:19 +0800 Subject: [PATCH] remove trainer_id in datafeed and dataset test=develop --- paddle/fluid/framework/data_feed.cc | 25 ++----------------------- paddle/fluid/framework/data_feed.h | 4 ---- paddle/fluid/framework/data_set.cc | 11 ----------- paddle/fluid/framework/data_set.h | 7 ------- paddle/fluid/pybind/data_set_py.cc | 2 -- python/paddle/fluid/dataset.py | 3 --- 6 files changed, 2 insertions(+), 50 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 365117bf80..b5f7e6c224 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -237,11 +237,6 @@ void InMemoryDataFeed::SetThreadNum(int thread_num) { thread_num_ = thread_num; } -template -void InMemoryDataFeed::SetTrainerId(int trainer_id) { - trainer_id_ = trainer_id; -} - template void InMemoryDataFeed::SetTrainerNum(int trainer_num) { trainer_num_ = trainer_num; @@ -372,12 +367,10 @@ void InMemoryDataFeed::GlobalShuffle() { auto fleet_ptr = FleetWrapper::GetInstance(); std::vector> send_vec(trainer_num_); std::vector send_index(trainer_num_); - std::vector local_send_vec; uint64_t reserve_len = fleet_send_batch_size_ / trainer_num_; for (auto& vec : send_vec) { vec.reserve(reserve_len); } - local_send_vec.reserve(reserve_len); for (int i = 0; i < trainer_num_; ++i) { send_index[i] = i; } @@ -390,23 +383,12 @@ void InMemoryDataFeed::GlobalShuffle() { // std::string ins_id = memory_data_[i].ins_id; int64_t random_num = rand_r(&rand_seed); int64_t node_id = random_num % trainer_num_; - if (node_id == trainer_id_) { - local_send_vec.push_back((*memory_data_)[i]); - } else { - send_vec[node_id].push_back(&((*memory_data_)[i])); - } + send_vec[node_id].push_back(&((*memory_data_)[i])); if (i % fleet_send_batch_size_ == 0 && i != 0) { // shuffle the sequence of sending to avoid network timeout error std::random_shuffle(send_index.begin(), send_index.end()); for (int index = 0; index < send_index.size(); ++index) { int j = send_index[index]; - if (j == trainer_id_) { - VLOG(3) << "send to local, ins num=" << local_send_vec.size() - << ", node_id=" << j << ", thread_id=" << thread_id_; - shuffled_ins_->Extend(std::move(local_send_vec)); - local_send_vec.clear(); - continue; - } std::string send_str; SerializeIns(send_vec[j], &send_str); VLOG(3) << "send str_length=" << send_str.length() @@ -423,10 +405,7 @@ void InMemoryDataFeed::GlobalShuffle() { std::random_shuffle(send_index.begin(), send_index.end()); for (int index = 0; index < send_index.size(); ++index) { int j = send_index[index]; - if (j == trainer_id_ && local_send_vec.size() != 0) { - shuffled_ins_->Extend(std::move(local_send_vec)); - std::vector().swap(local_send_vec); - } else if (send_vec[j].size() != 0) { + if (send_vec[j].size() != 0) { std::string send_str; SerializeIns(send_vec[j], &send_str); VLOG(3) << "send str_length=" << send_str.length() << " to node_id=" << j diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index e657e1d635..648c874a0b 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -91,8 +91,6 @@ class DataFeed { // This function will do nothing at default virtual void SetThreadId(int thread_id) {} // This function will do nothing at default - virtual void SetTrainerId(int trainer_id) {} - // This function will do nothing at default virtual void SetThreadNum(int thread_num) {} // This function will do nothing at default virtual void SetTrainerNum(int trainer_num) {} @@ -215,7 +213,6 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { virtual void SetMemoryDataMutex(std::mutex* mutex); virtual void SetThreadId(int thread_id); virtual void SetThreadNum(int thread_num); - virtual void SetTrainerId(int trainer_id); virtual void SetTrainerNum(int trainer_num); virtual void SetFleetSendBatchSize(int64_t size); virtual void PutInsToChannel(const std::string& ins_str); @@ -237,7 +234,6 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { int thread_id_; int thread_num_; - int trainer_id_; int trainer_num_; uint32_t rand_seed; std::vector* memory_data_; diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 4df7d6af0b..a3b7b1e454 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -52,17 +52,6 @@ void DatasetImpl::SetThreadNum(int thread_num) { thread_num_ = thread_num; } -// if you run distributed, and want to do global shuffle, -// set this before global shuffle. -// be sure you call CreateReaders before SetTrainerId -template -void DatasetImpl::SetTrainerId(int trainer_id) { - trainer_id_ = trainer_id; - for (auto reader : readers_) { - reader->SetTrainerId(trainer_id); - } -} - // if you run distributed, and want to do global shuffle, // set this before global shuffle. // be sure you call CreateReaders before SetTrainerNum diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 42073934d6..bbe0f937ab 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -45,8 +45,6 @@ class Dataset { virtual void SetFileList(const std::vector& filelist) = 0; // set readers' num virtual void SetThreadNum(int thread_num) = 0; - // set worker rank - virtual void SetTrainerId(int trainer_id) = 0; // set workers' num virtual void SetTrainerNum(int trainer_num) = 0; // set fleet send batch size @@ -61,8 +59,6 @@ class Dataset { virtual const std::vector& GetFileList() = 0; // get thread num virtual int GetThreadNum() = 0; - // get worker rank - virtual int GetTrainerId() = 0; // get worker num virtual int GetTrainerNum() = 0; // get fleet send batch size @@ -105,7 +101,6 @@ class DatasetImpl : public Dataset { virtual void SetFileList(const std::vector& filelist); virtual void SetThreadNum(int thread_num); - virtual void SetTrainerId(int trainer_id); virtual void SetTrainerNum(int trainer_num); virtual void SetFleetSendBatchSize(int64_t size); virtual void SetHdfsConfig(const std::string& fs_name, @@ -114,7 +109,6 @@ class DatasetImpl : public Dataset { virtual const std::vector& GetFileList() { return filelist_; } virtual int GetThreadNum() { return thread_num_; } - virtual int GetTrainerId() { return trainer_id_; } virtual int GetTrainerNum() { return trainer_num_; } virtual int64_t GetFleetSendBatchSize() { return fleet_send_batch_size_; } virtual std::pair GetHdfsConfig() { @@ -142,7 +136,6 @@ class DatasetImpl : public Dataset { std::mutex mutex_for_update_memory_data_; int thread_num_; paddle::framework::DataFeedDesc data_feed_desc_; - int trainer_id_; int trainer_num_; std::vector filelist_; size_t file_idx_; diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 0c7bd47523..3f171b65ab 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -49,7 +49,6 @@ void BindDataset(py::module* m) { })) .def("set_filelist", &framework::Dataset::SetFileList) .def("set_thread_num", &framework::Dataset::SetThreadNum) - .def("set_trainer_id", &framework::Dataset::SetTrainerId) .def("set_trainer_num", &framework::Dataset::SetTrainerNum) .def("set_fleet_send_batch_size", &framework::Dataset::SetFleetSendBatchSize) @@ -57,7 +56,6 @@ void BindDataset(py::module* m) { .def("set_data_feed_desc", &framework::Dataset::SetDataFeedDesc) .def("get_filelist", &framework::Dataset::GetFileList) .def("get_thread_num", &framework::Dataset::GetThreadNum) - .def("get_trainer_id", &framework::Dataset::GetTrainerId) .def("get_trainer_num", &framework::Dataset::GetTrainerNum) .def("get_fleet_send_batch_size", &framework::Dataset::GetFleetSendBatchSize) diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 11ac326e8f..e655fd4a97 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -240,15 +240,12 @@ class InMemoryDataset(DatasetBase): Args: fleet: fleet singleton. Default None. """ - trainer_id = 0 trainer_num = 1 fleet_send_batch_size = 80000 if fleet is not None: fleet.fleet_instance.role_maker_._barrier_worker() - trainer_id = fleet.worker_index() trainer_num = fleet.worker_num() self.dataset.register_client2client_msg_handler() - self.dataset.set_trainer_id(trainer_id) self.dataset.set_trainer_num(trainer_num) self.dataset.set_fleet_send_batch_size(fleet_send_batch_size) if fleet is not None: -- GitLab