diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 4346c144fab7f2e516bd36ab5c964b0b8677528c..7aeb9eaf3f19582d630996c18809503613859714 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -1334,6 +1334,29 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id, #endif } +void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) { +#ifdef PADDLE_WITH_PSLIB + assert(date.size() == 8); + int year = std::stoi(date.substr(0, 4)); + int month = std::stoi(date.substr(4, 2)); + int day = std::stoi(date.substr(6, 2)); + struct std::tm b; + b.tm_year = year - 1900; + b.tm_mon = month - 1; + b.tm_mday = day; + b.tm_hour = b.tm_min = b.tm_sec = 0; + std::time_t seconds_from_1970 = std::mktime(&b); + int day_id = seconds_from_1970 / 86400; + auto ret = pslib_ptr_->_worker_ptr->set_day_id(table_id, day_id); + ret.wait(); + if (ret.get() != 0) { + LOG(ERROR) << "setdate : " << date << " failed"; + } +#else + VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib"; +#endif +} + void FleetWrapper::PrintTableStat(const uint64_t table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index d368b421ff2a055aa91b0b920ee8c2f64daa8784..6fddedccf025858de0b9235d1b86152bf267f37c 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -336,6 +336,8 @@ class FleetWrapper { // this performs better than rand_r, especially large data std::default_random_engine& LocalRandomEngine(); + void SetDate(const uint64_t table_id, const std::string& date); + #ifdef PADDLE_WITH_PSLIB static std::shared_ptr pslib_ptr_; #endif diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h index 9facbff1f25269dc039c982e9b62229f9217b336..9f3d1a7adcafcc61426da2a5ab6e5fa7702fa972 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h @@ -128,7 +128,7 @@ void HashTable::dump_to_cpu(int devid, cudaStream_t stream) { downpour_value->resize(gpu_val.mf_size + downpour_value_size); } float* cpu_val = downpour_value->data(); - cpu_val[0] = 0; + // cpu_val[0] = 0; cpu_val[1] = gpu_val.delta_score; cpu_val[2] = gpu_val.show; cpu_val[3] = gpu_val.clk; diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index d1e98a711dc9dd532daf73d46e44b39ab0f3e3ae..d3990c1f3dd769b1882385b572ece9717c30887b 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -181,6 +181,19 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task) { VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size(); local_ptr[i].resize(local_keys[i].size()); } + +#ifdef PADDLE_WITH_PSLIB + // get day_id: day nums from 1970 + struct std::tm b; + b.tm_year = year_ - 1900; + b.tm_mon = month_ - 1; + b.tm_mday = day_; + b.tm_min = b.tm_hour = b.tm_sec = 0; + std::time_t seconds_from_1970 = std::mktime(&b); + int day_id = seconds_from_1970 / 86400; + fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id); +#endif + timeline.Start(); auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { size_t key_size = local_keys[i].size(); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index fa2ff6cbdb8c78abd1b5f93696938eff316451e0..6f785cad33e2d249629cb4dca5c3dca8bb65b08f 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -240,6 +240,12 @@ class PSGPUWrapper { mf_max_bound); } } + void SetDate(int year, int month, int day) { + year_ = year; + month_ = month; + day_ = day; + } + void SetDataset(Dataset* dataset) { dataset_ = dataset; } // PSGPUWrapper singleton @@ -283,6 +289,9 @@ class PSGPUWrapper { int thread_keys_thread_num_ = 37; int thread_keys_shard_num_ = 37; uint64_t max_fea_num_per_pass_ = 5000000000; + int year_; + int month_; + int day_; std::shared_ptr< paddle::framework::ChannelObject>> diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index d8142f717baed8e0bea3858027aa33899627f55c..af1c3da727d4178b8bab132fd9684aef48d445be 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -91,6 +91,7 @@ void BindFleetWrapper(py::module* m) { .def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable) .def("save_model_one_table_with_prefix", &framework::FleetWrapper::SaveModelOneTablePrefix) + .def("set_date", &framework::FleetWrapper::SetDate) .def("copy_table", &framework::FleetWrapper::CopyTable) .def("copy_table_by_feasign", &framework::FleetWrapper::CopyTableByFeasign); diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index 48365f42b11ba9a7afc4cb3578c2bbbc7002fc84..6e98a9479fa26a1d0ace170785685e4a27d2ca0b 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -41,6 +41,8 @@ void BindPSGPUWrapper(py::module* m) { py::call_guard()) .def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer, py::call_guard()) + .def("set_date", &framework::PSGPUWrapper::SetDate, + py::call_guard()) .def("set_dataset", &framework::PSGPUWrapper::SetDataset, py::call_guard()) .def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU, diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 25a1d98cb112182877809ecc3b132a1b0b2b0e68..e231ac55e679a27718d56ca2f1a14adb360a85c1 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -748,6 +748,42 @@ class InMemoryDataset(DatasetBase): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + def set_date(self, date): + """ + :api_attr: Static Graph + + Set training date for pull sparse parameters, saving and loading model. Only used in psgpu + + Args: + date(str): training date(format : YYMMDD). eg.20211111 + + Examples: + .. code-block:: python + + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + dataset.set_date("20211111") + """ + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:]) + if self.use_ps_gpu and core._is_compiled_with_heterps(): + self.psgpu.set_date(year, month, day) + def load_into_memory(self, is_shuffle=False): """ :api_attr: Static Graph diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index d683e36fbe5ab31fe5e5f7e0dd046fbff3dadfde..972f59d1e9058a9bafd80bd862402f8f5c1a0b14 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -716,6 +716,29 @@ class InMemoryDataset(DatasetBase): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + def set_date(self, date): + """ + :api_attr: Static Graph + + Set training date for pull sparse parameters, saving and loading model. Only used in psgpu + + Args: + date(str): training date(format : YYMMDD). eg.20211111 + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_date("20211111") + """ + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:]) + if self.use_ps_gpu and core._is_compiled_with_heterps(): + self.psgpu.set_date(year, month, day) + @deprecated( since="2.0.0", update_to="paddle.distributed.InMemoryDataset.load_into_memory") diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 78af7fd65dccbb390fc8110d591fc626a55eae4e..309532cafc2e162eb7a97a2f61076752a391009e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -799,6 +799,15 @@ class PSLib(Fleet): self._fleet_ptr.save_model_one_table(table_id, model_dir, mode) self._role_maker._barrier_worker() + def set_date(self, table_id, date): + """ + set_date, eg, 20210918 + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.set_date(table_id, str(date)) + self._role_maker._barrier_worker() + def _set_opt_info(self, opt_info): """ this function saves the result from DistributedOptimizer.minimize() diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 6ab8a2c3a4b220fe2b6fe30e1dbd3be0e35cf5a4..1faa084d412e42294d865acc123a87f238cad666 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -74,6 +74,7 @@ class TestCommunicator(unittest.TestCase): batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) dataset._set_use_ps_gpu(1) + dataset.set_date("20211111") dataset.load_into_memory(is_shuffle=True) os.environ["TEST_MODE"] = "1" @@ -88,7 +89,6 @@ class TestCommunicator(unittest.TestCase): pass except Exception as e: self.assertTrue(False) - time.sleep(10) fleet.stop_worker() os.remove("./test_communicator_ps_gpu.txt")