未验证 提交 1bebc092 编写于 作者: T Thunderbrook 提交者: GitHub

solve build gpu task core (#30626)

* build gpu task core

* format
上级 33bf6eb7
......@@ -30,11 +30,19 @@ namespace framework {
class HeterContext {
public:
~HeterContext() {
for (size_t i = 0; i < mutex_.size(); ++i) {
delete mutex_[i];
}
mutex_.clear();
}
Scope* scope_{nullptr};
std::vector<std::vector<FeatureKey>> feature_keys_;
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> value_ptr_;
std::vector<std::vector<FeatureValue>> feature_values_;
std::vector<std::mutex*> mutex_lock_;
std::vector<std::vector<FeatureValue>> device_values_;
std::vector<std::vector<FeatureKey>> device_keys_;
std::vector<std::mutex*> mutex_;
uint32_t shard_num_ = 37;
uint64_t size() {
uint64_t total_size = 0;
......@@ -45,19 +53,28 @@ class HeterContext {
}
void SetShardNum(uint32_t shard_num) { shard_num_ = shard_num; }
uint32_t ShardNum() { return shard_num_; }
void init() { feature_keys_.resize(shard_num_); }
void init(int shard_num, int device_num) {
shard_num_ = shard_num;
feature_keys_.resize(shard_num_);
value_ptr_.resize(shard_num_);
device_values_.resize(device_num);
device_keys_.resize(device_num);
mutex_.resize(device_num);
for (size_t i = 0; i < mutex_.size(); ++i) {
mutex_[i] = new std::mutex();
}
}
void batch_add_keys(const std::vector<std::vector<uint64_t>>& thread_keys) {
assert(thread_keys.size() == feature_keys_.size());
for (uint32_t i = 0; i < shard_num_; i++) {
int idx = 0;
// mutex_lock_[i]->lock();
idx = feature_keys_[i].size();
feature_keys_[i].resize(feature_keys_[i].size() + thread_keys[i].size());
for (uint64_t j = 0; j < thread_keys[i].size(); j++) {
feature_keys_[i][idx + j] = thread_keys[i][j];
}
// mutex_lock_[i]->unlock();
}
}
void UniqueKeys() {
......
......@@ -40,16 +40,22 @@ namespace framework {
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;
void PSGPUWrapper::BuildTask(uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
uint64_t table_id, int feature_dim) {
VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
platform::Timer timeline;
timeline.Start();
int device_num = heter_devices_.size();
MultiSlotDataset* dataset = dynamic_cast<MultiSlotDataset*>(dataset_);
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->init(thread_keys_shard_num_, device_num);
auto input_channel = dataset->GetInputChannel();
auto& local_keys = gpu_task->feature_keys_;
auto& local_values = gpu_task->feature_values_;
auto& local_ptr = gpu_task->value_ptr_;
auto& device_keys = gpu_task->device_keys_;
auto& device_vals = gpu_task->device_values_;
auto& device_mutex = gpu_task->mutex_;
std::vector<std::thread> threads;
auto fleet_ptr = FleetWrapper::GetInstance();
......@@ -91,12 +97,11 @@ void PSGPUWrapper::BuildTask(uint64_t table_id, int feature_dim) {
t.join();
}
timeline.Pause();
VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
VLOG(1) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
timeline.Start();
// merge thread_keys to shard_keys
gpu_task->init();
for (size_t i = 0; i < thread_keys_.size(); i++) {
gpu_task->batch_add_keys(thread_keys_[i]);
for (int j = 0; j < thread_keys_thread_num_; j++) {
......@@ -105,21 +110,20 @@ void PSGPUWrapper::BuildTask(uint64_t table_id, int feature_dim) {
}
timeline.Pause();
VLOG(0) << "GpuPs task unique11111 cost " << timeline.ElapsedSec()
VLOG(1) << "GpuPs task unique11111 cost " << timeline.ElapsedSec()
<< " seconds.";
VLOG(0) << "FK1";
timeline.Start();
gpu_task->UniqueKeys();
timeline.Pause();
VLOG(0) << "GpuPs task unique cost " << timeline.ElapsedSec() << " seconds.";
VLOG(1) << "GpuPs task unique cost " << timeline.ElapsedSec() << " seconds.";
for (int i = 0; i < thread_keys_shard_num_; i++) {
local_values[i].resize(local_keys[i].size());
VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size();
local_ptr[i].resize(local_keys[i].size());
}
auto ptl_func = [this, &local_keys, &local_values, &local_ptr, &table_id,
auto ptl_func = [this, &local_keys, &local_ptr, &table_id,
&fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
......@@ -136,10 +140,42 @@ void PSGPUWrapper::BuildTask(uint64_t table_id, int feature_dim) {
VLOG(3) << "FleetWrapper Pull sparse to local done with table size: "
<< local_keys[i].size();
}
for (size_t num = 0; num < local_ptr[i].size(); ++num) {
float* ptr_val = local_ptr[i][num]->data();
FeatureValue& val = local_values[i][num];
size_t dim = local_ptr[i][num]->size();
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(ptl_func, i);
}
for (std::thread& t : threads) {
t.join();
}
timeline.Pause();
VLOG(1) << "GpuPs pull sparse cost " << timeline.ElapsedSec() << " seconds.";
timeline.Start();
auto build_func = [device_num, &local_keys, &local_ptr, &device_keys,
&device_vals, &device_mutex](int i) {
std::vector<std::vector<FeatureKey>> task_keys(device_num);
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> task_ptrs(
device_num);
for (size_t j = 0; j < local_keys[i].size(); j++) {
int shard = local_keys[i][j] % device_num;
task_keys[shard].push_back(local_keys[i][j]);
task_ptrs[shard].push_back(local_ptr[i][j]);
}
for (int dev = 0; dev < device_num; dev++) {
device_mutex[dev]->lock();
int len = task_keys[dev].size();
int cur = device_keys[dev].size();
device_keys[dev].resize(device_keys[dev].size() + len);
device_vals[dev].resize(device_vals[dev].size() + len);
for (int j = 0; j < len; ++j) {
device_keys[dev][cur + j] = task_keys[dev][j];
float* ptr_val = task_ptrs[dev][j]->data();
FeatureValue& val = device_vals[dev][cur + j];
size_t dim = task_ptrs[dev][j]->size();
val.delta_score = ptr_val[1];
val.show = ptr_val[2];
......@@ -160,44 +196,46 @@ void PSGPUWrapper::BuildTask(uint64_t table_id, int feature_dim) {
}
}
}
device_mutex[dev]->unlock();
}
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(ptl_func, i);
threads[i] = std::thread(build_func, i);
}
for (std::thread& t : threads) {
t.join();
}
timeline.Pause();
VLOG(0) << "GpuPs pull sparse cost " << timeline.ElapsedSec() << " seconds.";
VLOG(1) << "GpuPs prepare for build hbm cost " << timeline.ElapsedSec()
<< " seconds.";
}
void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
BuildTask(table_id, feature_dim);
int device_num = heter_devices_.size();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
BuildTask(gpu_task, table_id, feature_dim);
platform::Timer timeline;
timeline.Start();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
int shard_num = gpu_task->feature_keys_.size();
if (shard_num == 0) {
return;
}
std::vector<size_t> feature_keys_count(shard_num);
std::vector<size_t> feature_keys_count(device_num);
size_t size_max = 0;
for (int i = 0; i < shard_num; i++) {
feature_keys_count[i] = gpu_task->feature_keys_[i].size();
for (int i = 0; i < device_num; i++) {
feature_keys_count[i] = gpu_task->device_keys_[i].size();
size_max = std::max(size_max, feature_keys_count[i]);
}
if (HeterPs_) {
HeterPs_->show_one_table(0);
return;
}
std::vector<std::thread> threads(shard_num);
std::vector<std::thread> threads(device_num);
HeterPs_ = HeterPsBase::get_instance(size_max, resource_);
auto build_func = [this, &gpu_task, &feature_keys_count](int i) {
std::cout << "building table: " << i << std::endl;
this->HeterPs_->build_ps(i, gpu_task->feature_keys_[i].data(),
gpu_task->feature_values_[i].data(),
feature_keys_count[i], 10000, 2);
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
HeterPs_->show_one_table(i);
};
for (size_t i = 0; i < threads.size(); i++) {
......@@ -207,7 +245,7 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
t.join();
}
timeline.Pause();
VLOG(0) << "GpuPs build table total costs: " << timeline.ElapsedSec()
VLOG(1) << "GpuPs build table total costs: " << timeline.ElapsedSec()
<< " s.";
}
......
......@@ -76,7 +76,8 @@ class PSGPUWrapper {
const int batch_size);
void BuildGPUPS(const uint64_t table_id, int feature_dim);
void BuildTask(uint64_t table_id, int feature_dim);
void BuildTask(std::shared_ptr<HeterContext> gpu_task, uint64_t table_id,
int feature_dim);
void InitializeGPU(const std::vector<int>& dev_ids) {
if (s_instance_ != NULL) {
VLOG(3) << "PSGPUWrapper Begin InitializeGPU";
......
......@@ -74,8 +74,6 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc,
workers_[i]->Initialize(trainer_desc);
workers_[i]->SetWorkerNum(place_num);
}
auto gpu_ps_wrapper = PSGPUWrapper::GetInstance();
gpu_ps_wrapper->InitializeGPU(dev_ids);
return;
}
......
......@@ -41,6 +41,10 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer,
py::call_guard<py::gil_scoped_release>())
.def("set_dataset", &framework::PSGPUWrapper::SetDataset,
py::call_guard<py::gil_scoped_release>())
.def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU,
py::call_guard<py::gil_scoped_release>())
.def("build_gpu_ps", &framework::PSGPUWrapper::BuildGPUPS,
py::call_guard<py::gil_scoped_release>());
} // end PSGPUWrapper
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册