From 9e4944725d7ad61ef2092dacdf0fecec78cac3fd Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Tue, 19 Oct 2021 15:49:57 +0800 Subject: [PATCH] [heterps]edit shrink and unseenday logit for pslib (#36194) --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 23 ++++++++++++ paddle/fluid/framework/fleet/fleet_wrapper.h | 2 ++ .../framework/fleet/heter_ps/hashtable_inl.h | 2 +- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 13 +++++++ paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 9 +++++ paddle/fluid/pybind/fleet_wrapper_py.cc | 1 + paddle/fluid/pybind/ps_gpu_wrapper_py.cc | 2 ++ .../distributed/fleet/dataset/dataset.py | 36 +++++++++++++++++++ python/paddle/fluid/dataset.py | 23 ++++++++++++ .../fleet/parameter_server/pslib/__init__.py | 9 +++++ .../unittests/test_communicator_ps_gpu.py | 2 +- 11 files changed, 120 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 4346c144fab..7aeb9eaf3f1 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -1334,6 +1334,29 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id, #endif } +void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) { +#ifdef PADDLE_WITH_PSLIB + assert(date.size() == 8); + int year = std::stoi(date.substr(0, 4)); + int month = std::stoi(date.substr(4, 2)); + int day = std::stoi(date.substr(6, 2)); + struct std::tm b; + b.tm_year = year - 1900; + b.tm_mon = month - 1; + b.tm_mday = day; + b.tm_hour = b.tm_min = b.tm_sec = 0; + std::time_t seconds_from_1970 = std::mktime(&b); + int day_id = seconds_from_1970 / 86400; + auto ret = pslib_ptr_->_worker_ptr->set_day_id(table_id, day_id); + ret.wait(); + if (ret.get() != 0) { + LOG(ERROR) << "setdate : " << date << " failed"; + } +#else + VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib"; +#endif +} + void FleetWrapper::PrintTableStat(const uint64_t table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index d368b421ff2..6fddedccf02 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -336,6 +336,8 @@ class FleetWrapper { // this performs better than rand_r, especially large data std::default_random_engine& LocalRandomEngine(); + void SetDate(const uint64_t table_id, const std::string& date); + #ifdef PADDLE_WITH_PSLIB static std::shared_ptr pslib_ptr_; #endif diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h index 9facbff1f25..9f3d1a7adca 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h @@ -128,7 +128,7 @@ void HashTable::dump_to_cpu(int devid, cudaStream_t stream) { downpour_value->resize(gpu_val.mf_size + downpour_value_size); } float* cpu_val = downpour_value->data(); - cpu_val[0] = 0; + // cpu_val[0] = 0; cpu_val[1] = gpu_val.delta_score; cpu_val[2] = gpu_val.show; cpu_val[3] = gpu_val.clk; diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index d1e98a711dc..d3990c1f3dd 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -181,6 +181,19 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size(); local_ptr[i].resize(local_keys[i].size()); } + +#ifdef PADDLE_WITH_PSLIB + // get day_id: day nums from 1970 + struct std::tm b; + b.tm_year = year_ - 1900; + b.tm_mon = month_ - 1; + b.tm_mday = day_; + b.tm_min = b.tm_hour = b.tm_sec = 0; + std::time_t seconds_from_1970 = std::mktime(&b); + int day_id = seconds_from_1970 / 86400; + fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id); +#endif + timeline.Start(); auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { size_t key_size = local_keys[i].size(); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index fa2ff6cbdb8..6f785cad33e 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -240,6 +240,12 @@ class PSGPUWrapper { mf_max_bound); } } + void SetDate(int year, int month, int day) { + year_ = year; + month_ = month; + day_ = day; + } + void SetDataset(Dataset* dataset) { dataset_ = dataset; } // PSGPUWrapper singleton @@ -283,6 +289,9 @@ class PSGPUWrapper { int thread_keys_thread_num_ = 37; int thread_keys_shard_num_ = 37; uint64_t max_fea_num_per_pass_ = 5000000000; + int year_; + int month_; + int day_; std::shared_ptr< paddle::framework::ChannelObject>> diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index d8142f717ba..af1c3da727d 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -91,6 +91,7 @@ void BindFleetWrapper(py::module* m) { .def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable) .def("save_model_one_table_with_prefix", &framework::FleetWrapper::SaveModelOneTablePrefix) + .def("set_date", &framework::FleetWrapper::SetDate) .def("copy_table", &framework::FleetWrapper::CopyTable) .def("copy_table_by_feasign", &framework::FleetWrapper::CopyTableByFeasign); diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index 48365f42b11..6e98a9479fa 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -41,6 +41,8 @@ void BindPSGPUWrapper(py::module* m) { py::call_guard()) .def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer, py::call_guard()) + .def("set_date", &framework::PSGPUWrapper::SetDate, + py::call_guard()) .def("set_dataset", &framework::PSGPUWrapper::SetDataset, py::call_guard()) .def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU, diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 25a1d98cb11..e231ac55e67 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -748,6 +748,42 @@ class InMemoryDataset(DatasetBase): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + def set_date(self, date): + """ + :api_attr: Static Graph + + Set training date for pull sparse parameters, saving and loading model. Only used in psgpu + + Args: + date(str): training date(format : YYMMDD). eg.20211111 + + Examples: + .. code-block:: python + + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + dataset.set_date("20211111") + """ + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:]) + if self.use_ps_gpu and core._is_compiled_with_heterps(): + self.psgpu.set_date(year, month, day) + def load_into_memory(self, is_shuffle=False): """ :api_attr: Static Graph diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index d683e36fbe5..972f59d1e90 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -716,6 +716,29 @@ class InMemoryDataset(DatasetBase): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + def set_date(self, date): + """ + :api_attr: Static Graph + + Set training date for pull sparse parameters, saving and loading model. Only used in psgpu + + Args: + date(str): training date(format : YYMMDD). eg.20211111 + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_date("20211111") + """ + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:]) + if self.use_ps_gpu and core._is_compiled_with_heterps(): + self.psgpu.set_date(year, month, day) + @deprecated( since="2.0.0", update_to="paddle.distributed.InMemoryDataset.load_into_memory") diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 78af7fd65dc..309532cafc2 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -799,6 +799,15 @@ class PSLib(Fleet): self._fleet_ptr.save_model_one_table(table_id, model_dir, mode) self._role_maker._barrier_worker() + def set_date(self, table_id, date): + """ + set_date, eg, 20210918 + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.set_date(table_id, str(date)) + self._role_maker._barrier_worker() + def _set_opt_info(self, opt_info): """ this function saves the result from DistributedOptimizer.minimize() diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 6ab8a2c3a4b..1faa084d412 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -74,6 +74,7 @@ class TestCommunicator(unittest.TestCase): batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) dataset._set_use_ps_gpu(1) + dataset.set_date("20211111") dataset.load_into_memory(is_shuffle=True) os.environ["TEST_MODE"] = "1" @@ -88,7 +89,6 @@ class TestCommunicator(unittest.TestCase): pass except Exception as e: self.assertTrue(False) - time.sleep(10) fleet.stop_worker() os.remove("./test_communicator_ps_gpu.txt") -- GitLab