diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 65892f8488475abfcde0ca3fa12789baceb0a1b8..ee2063a5d9abc07cd82c5fd66e0d9edf926aecad 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -838,29 +838,42 @@ void PSGPUWrapper::EndPass() { std::max(keysize_max, current_task_->device_dim_keys_[i][j].size()); } } - - auto dump_pool_to_cpu_func = [this](int i, int j) { + int thread_num = 8; + auto dump_pool_to_cpu_func = [this, thread_num](int i, int j, int z) { PADDLE_ENFORCE_GPU_SUCCESS(cudaSetDevice(this->resource_->dev_id(i))); auto& hbm_pool = this->hbm_pools_[i * this->multi_mf_dim_ + j]; auto& device_keys = this->current_task_->device_dim_keys_[i][j]; size_t len = device_keys.size(); + // ====== multi-thread process feasign================ + int len_per_thread = len / thread_num; + int remain = len % thread_num; + int left = -1, right = -1; + int real_len = len_per_thread; + if (z < remain) real_len++; + if (z < remain) { + left = z * (len_per_thread + 1); + right = left + real_len; + } else { + left = remain * (len_per_thread + 1) + (z - remain) * len_per_thread; + right = left + real_len; + } + // ============ multi-thread process feasign============ int mf_dim = this->index_dim_vec_[j]; VLOG(0) << "dump pool to cpu table: " << i << "with mf dim: " << mf_dim; size_t feature_value_size = TYPEALIGN(8, sizeof(FeatureValue) + ((mf_dim + 1) * sizeof(float))); - - char* test_build_values = (char*)malloc(feature_value_size * len); - cudaMemcpy(test_build_values, hbm_pool->mem(), feature_value_size * len, - cudaMemcpyDeviceToHost); - + char* test_build_values = (char*)malloc(feature_value_size * real_len); + uint64_t offset = left * feature_value_size; + cudaMemcpy(test_build_values, hbm_pool->mem() + offset, + feature_value_size * real_len, cudaMemcpyDeviceToHost); CHECK(len == hbm_pool->capacity()); uint64_t unuse_key = std::numeric_limits::max(); - for (size_t i = 0; i < len; ++i) { + for (int i = left; i < right; ++i) { if (device_keys[i] == unuse_key) { continue; } - size_t offset = i * feature_value_size; - FeatureValue* gpu_val = (FeatureValue*)(test_build_values + offset); + size_t local_offset = (i - left) * feature_value_size; + FeatureValue* gpu_val = (FeatureValue*)(test_build_values + local_offset); #ifdef PADDLE_WITH_PSLIB auto* downpour_value = (paddle::ps::DownpourFixedFeatureValue*)(gpu_val->cpu_ptr); @@ -912,10 +925,13 @@ void PSGPUWrapper::EndPass() { if (multi_mf_dim_) { VLOG(0) << "psgpu wrapper dump pool: multi_mf_dim_: " << multi_mf_dim_; size_t device_num = heter_devices_.size(); - std::vector threads(device_num * multi_mf_dim_); + std::vector threads(device_num * multi_mf_dim_ * thread_num); for (size_t i = 0; i < device_num; i++) { for (int j = 0; j < multi_mf_dim_; j++) { - threads[i + j * device_num] = std::thread(dump_pool_to_cpu_func, i, j); + for (int k = 0; k < thread_num; k++) { + threads[(i + j * device_num) * thread_num + k] = + std::thread(dump_pool_to_cpu_func, i, j, k); + } } } for (std::thread& t : threads) {