未验证 提交 9e494472 编写于 作者: D danleifeng 提交者: GitHub

[heterps]edit shrink and unseenday logit for pslib (#36194)

上级 be6a8330
...@@ -1334,6 +1334,29 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id, ...@@ -1334,6 +1334,29 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id,
#endif #endif
} }
void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) {
#ifdef PADDLE_WITH_PSLIB
assert(date.size() == 8);
int year = std::stoi(date.substr(0, 4));
int month = std::stoi(date.substr(4, 2));
int day = std::stoi(date.substr(6, 2));
struct std::tm b;
b.tm_year = year - 1900;
b.tm_mon = month - 1;
b.tm_mday = day;
b.tm_hour = b.tm_min = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);
int day_id = seconds_from_1970 / 86400;
auto ret = pslib_ptr_->_worker_ptr->set_day_id(table_id, day_id);
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "setdate : " << date << " failed";
}
#else
VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib";
#endif
}
void FleetWrapper::PrintTableStat(const uint64_t table_id) { void FleetWrapper::PrintTableStat(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
......
...@@ -336,6 +336,8 @@ class FleetWrapper { ...@@ -336,6 +336,8 @@ class FleetWrapper {
// this performs better than rand_r, especially large data // this performs better than rand_r, especially large data
std::default_random_engine& LocalRandomEngine(); std::default_random_engine& LocalRandomEngine();
void SetDate(const uint64_t table_id, const std::string& date);
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
static std::shared_ptr<paddle::distributed::PSlib> pslib_ptr_; static std::shared_ptr<paddle::distributed::PSlib> pslib_ptr_;
#endif #endif
......
...@@ -128,7 +128,7 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) { ...@@ -128,7 +128,7 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size); downpour_value->resize(gpu_val.mf_size + downpour_value_size);
} }
float* cpu_val = downpour_value->data(); float* cpu_val = downpour_value->data();
cpu_val[0] = 0; // cpu_val[0] = 0;
cpu_val[1] = gpu_val.delta_score; cpu_val[1] = gpu_val.delta_score;
cpu_val[2] = gpu_val.show; cpu_val[2] = gpu_val.show;
cpu_val[3] = gpu_val.clk; cpu_val[3] = gpu_val.clk;
......
...@@ -181,6 +181,19 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) { ...@@ -181,6 +181,19 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size(); VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size();
local_ptr[i].resize(local_keys[i].size()); local_ptr[i].resize(local_keys[i].size());
} }
#ifdef PADDLE_WITH_PSLIB
// get day_id: day nums from 1970
struct std::tm b;
b.tm_year = year_ - 1900;
b.tm_mon = month_ - 1;
b.tm_mday = day_;
b.tm_min = b.tm_hour = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);
int day_id = seconds_from_1970 / 86400;
fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id);
#endif
timeline.Start(); timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) { auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
size_t key_size = local_keys[i].size(); size_t key_size = local_keys[i].size();
......
...@@ -240,6 +240,12 @@ class PSGPUWrapper { ...@@ -240,6 +240,12 @@ class PSGPUWrapper {
mf_max_bound); mf_max_bound);
} }
} }
void SetDate(int year, int month, int day) {
year_ = year;
month_ = month;
day_ = day;
}
void SetDataset(Dataset* dataset) { dataset_ = dataset; } void SetDataset(Dataset* dataset) { dataset_ = dataset; }
// PSGPUWrapper singleton // PSGPUWrapper singleton
...@@ -283,6 +289,9 @@ class PSGPUWrapper { ...@@ -283,6 +289,9 @@ class PSGPUWrapper {
int thread_keys_thread_num_ = 37; int thread_keys_thread_num_ = 37;
int thread_keys_shard_num_ = 37; int thread_keys_shard_num_ = 37;
uint64_t max_fea_num_per_pass_ = 5000000000; uint64_t max_fea_num_per_pass_ = 5000000000;
int year_;
int month_;
int day_;
std::shared_ptr< std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>> paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
......
...@@ -91,6 +91,7 @@ void BindFleetWrapper(py::module* m) { ...@@ -91,6 +91,7 @@ void BindFleetWrapper(py::module* m) {
.def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable) .def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable)
.def("save_model_one_table_with_prefix", .def("save_model_one_table_with_prefix",
&framework::FleetWrapper::SaveModelOneTablePrefix) &framework::FleetWrapper::SaveModelOneTablePrefix)
.def("set_date", &framework::FleetWrapper::SetDate)
.def("copy_table", &framework::FleetWrapper::CopyTable) .def("copy_table", &framework::FleetWrapper::CopyTable)
.def("copy_table_by_feasign", .def("copy_table_by_feasign",
&framework::FleetWrapper::CopyTableByFeasign); &framework::FleetWrapper::CopyTableByFeasign);
......
...@@ -41,6 +41,8 @@ void BindPSGPUWrapper(py::module* m) { ...@@ -41,6 +41,8 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer, .def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("set_date", &framework::PSGPUWrapper::SetDate,
py::call_guard<py::gil_scoped_release>())
.def("set_dataset", &framework::PSGPUWrapper::SetDataset, .def("set_dataset", &framework::PSGPUWrapper::SetDataset,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU, .def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU,
......
...@@ -748,6 +748,42 @@ class InMemoryDataset(DatasetBase): ...@@ -748,6 +748,42 @@ class InMemoryDataset(DatasetBase):
self.dataset.generate_local_tables_unlock( self.dataset.generate_local_tables_unlock(
table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) table_id, fea_dim, read_thread_num, consume_thread_num, shard_num)
def set_date(self, date):
"""
:api_attr: Static Graph
Set training date for pull sparse parameters, saving and loading model. Only used in psgpu
Args:
date(str): training date(format : YYMMDD). eg.20211111
Examples:
.. code-block:: python
import paddle
paddle.enable_static()
dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
var = paddle.static.data(
name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var)
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=slots_vars)
dataset.set_date("20211111")
"""
year = int(date[:4])
month = int(date[4:6])
day = int(date[6:])
if self.use_ps_gpu and core._is_compiled_with_heterps():
self.psgpu.set_date(year, month, day)
def load_into_memory(self, is_shuffle=False): def load_into_memory(self, is_shuffle=False):
""" """
:api_attr: Static Graph :api_attr: Static Graph
......
...@@ -716,6 +716,29 @@ class InMemoryDataset(DatasetBase): ...@@ -716,6 +716,29 @@ class InMemoryDataset(DatasetBase):
self.dataset.generate_local_tables_unlock( self.dataset.generate_local_tables_unlock(
table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) table_id, fea_dim, read_thread_num, consume_thread_num, shard_num)
def set_date(self, date):
"""
:api_attr: Static Graph
Set training date for pull sparse parameters, saving and loading model. Only used in psgpu
Args:
date(str): training date(format : YYMMDD). eg.20211111
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_date("20211111")
"""
year = int(date[:4])
month = int(date[4:6])
day = int(date[6:])
if self.use_ps_gpu and core._is_compiled_with_heterps():
self.psgpu.set_date(year, month, day)
@deprecated( @deprecated(
since="2.0.0", since="2.0.0",
update_to="paddle.distributed.InMemoryDataset.load_into_memory") update_to="paddle.distributed.InMemoryDataset.load_into_memory")
......
...@@ -799,6 +799,15 @@ class PSLib(Fleet): ...@@ -799,6 +799,15 @@ class PSLib(Fleet):
self._fleet_ptr.save_model_one_table(table_id, model_dir, mode) self._fleet_ptr.save_model_one_table(table_id, model_dir, mode)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def set_date(self, table_id, date):
"""
set_date, eg, 20210918
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.set_date(table_id, str(date))
self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info): def _set_opt_info(self, opt_info):
""" """
this function saves the result from DistributedOptimizer.minimize() this function saves the result from DistributedOptimizer.minimize()
......
...@@ -74,6 +74,7 @@ class TestCommunicator(unittest.TestCase): ...@@ -74,6 +74,7 @@ class TestCommunicator(unittest.TestCase):
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"]) dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset._set_use_ps_gpu(1) dataset._set_use_ps_gpu(1)
dataset.set_date("20211111")
dataset.load_into_memory(is_shuffle=True) dataset.load_into_memory(is_shuffle=True)
os.environ["TEST_MODE"] = "1" os.environ["TEST_MODE"] = "1"
...@@ -88,7 +89,6 @@ class TestCommunicator(unittest.TestCase): ...@@ -88,7 +89,6 @@ class TestCommunicator(unittest.TestCase):
pass pass
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
time.sleep(10) time.sleep(10)
fleet.stop_worker() fleet.stop_worker()
os.remove("./test_communicator_ps_gpu.txt") os.remove("./test_communicator_ps_gpu.txt")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册