ps_gpu_wrapper.cc 46.5 KB
Newer Older
T
Thunderbrook 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

T
Thunderbrook 已提交
29
#ifdef PADDLE_WITH_HETERPS
Y
yaoxuefeng 已提交
30

31 32
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"

T
Thunderbrook 已提交
33
#include <algorithm>
Y
yaoxuefeng 已提交
34 35
#include <deque>

D
danleifeng 已提交
36
#include "paddle/fluid/framework/data_set.h"
T
Thunderbrook 已提交
37
#include "paddle/fluid/platform/timer.h"
38 39 40
#if defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
#endif
T
Thunderbrook 已提交
41 42 43 44

namespace paddle {
namespace framework {

T
Thunderbrook 已提交
45
#ifdef PADDLE_WITH_PSLIB
46 47 48 49 50 51
void AfsWrapper::init(const std::string& fs_name,
                      const std::string& fs_user,
                      const std::string& pass_wd,
                      const std::string& conf) {
  int ret = afs_handler_.init(
      fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(), conf.c_str());
T
Thunderbrook 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  if (ret != 0) {
    LOG(ERROR) << "AFS Init Error";
  }
}

int AfsWrapper::remove(const std::string& path) {
  return afs_handler_.remove(path);
}

int AfsWrapper::mkdir(const std::string& path) {
  return afs_handler_.mkdir(path);
}

std::vector<std::string> AfsWrapper::list(const std::string& path) {
  return afs_handler_.list(path);
}

int AfsWrapper::exist(const std::string& path) {
  return afs_handler_.exist(path);
}

int AfsWrapper::upload(const std::string& local_file,
                       const std::string& afs_file) {
  return afs_handler_.upload_file(local_file, afs_file);
}

int AfsWrapper::download(const std::string& local_file,
                         const std::string& afs_file) {
  return afs_handler_.download_file(local_file, afs_file);
}
82 83 84 85 86 87 88 89 90 91 92 93

int AfsWrapper::touchz(const std::string& path) {
  return afs_handler_.touchz(path);
}

std::string AfsWrapper::cat(const std::string& path) {
  return afs_handler_.cat(path);
}

int AfsWrapper::mv(const std::string& old_path, const std::string& dest_path) {
  return afs_handler_.mv(old_path, dest_path);
}
T
Thunderbrook 已提交
94 95
#endif

T
Thunderbrook 已提交
96 97
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;
98
std::mutex PSGPUWrapper::ins_mutex;
T
Thunderbrook 已提交
99 100 101 102 103
#ifdef PADDLE_WITH_PSLIB
void PSGPUWrapper::InitAfsApi(const std::string& fs_name,
                              const std::string& fs_user,
                              const std::string& pass_wd,
                              const std::string& conf) {
104 105
  int ret = afs_handler_.init(
      fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(), conf.c_str());
T
Thunderbrook 已提交
106
  if (ret != 0) {
107
    VLOG(0) << "AFS Init Error";
T
Thunderbrook 已提交
108 109 110 111
  }
  use_afs_api_ = 1;
}
#endif
112
void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {
Y
yaoxuefeng 已提交
113
  VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
T
Thunderbrook 已提交
114 115
  platform::Timer timeline;
  timeline.Start();
116
  int device_num = heter_devices_.size();
Y
yaoxuefeng 已提交
117
  gpu_task->init(thread_keys_shard_num_, device_num, multi_mf_dim_);
118

Y
yaoxuefeng 已提交
119
  std::vector<std::thread> threads;
Y
yaoxuefeng 已提交
120 121 122 123 124 125 126 127

  // data should be in input channel

  thread_dim_keys_.resize(thread_keys_thread_num_);
  for (int i = 0; i < thread_keys_thread_num_; i++) {
    thread_dim_keys_[i].resize(thread_keys_shard_num_);
    for (int j = 0; j < thread_keys_shard_num_; j++) {
      thread_dim_keys_[i][j].resize(multi_mf_dim_);
128
    }
Y
yaoxuefeng 已提交
129
  }
Y
yaoxuefeng 已提交
130 131 132 133

  size_t total_len = 0;
  size_t len_per_thread = 0;
  int remain = 0;
Y
yaoxuefeng 已提交
134
  size_t begin = 0;
Y
yaoxuefeng 已提交
135 136 137 138

  std::string data_set_name = std::string(typeid(*dataset_).name());

  if (data_set_name.find("SlotRecordDataset") != std::string::npos) {
D
danleifeng 已提交
139
    SlotRecordDataset* dataset = (SlotRecordDataset*)(dataset_);
Y
yaoxuefeng 已提交
140
    auto input_channel = dataset->GetInputChannel();
Y
yaoxuefeng 已提交
141
    VLOG(0) << "psgpu wrapperinputslotchannle size: " << input_channel->Size();
Y
yaoxuefeng 已提交
142 143 144 145 146
    const std::deque<SlotRecord>& vec_data = input_channel->GetData();
    total_len = vec_data.size();
    len_per_thread = total_len / thread_keys_thread_num_;
    remain = total_len % thread_keys_thread_num_;
    VLOG(0) << "total len: " << total_len;
147
    auto gen_dynamic_mf_func = [this](const std::deque<SlotRecord>& total_data,
148 149 150
                                      int begin_index,
                                      int end_index,
                                      int i) {
151
      for (auto iter = total_data.begin() + begin_index;
152 153
           iter != total_data.begin() + end_index;
           iter++) {
154 155 156 157 158 159
        const auto& ins = *iter;
        const auto& feasign_v = ins->slot_uint64_feasigns_.slot_values;
        const auto& slot_offset = ins->slot_uint64_feasigns_.slot_offsets;
        for (size_t slot_idx = 0; slot_idx < slot_offset_vector_.size();
             slot_idx++) {
          for (size_t j = slot_offset[slot_offset_vector_[slot_idx]];
160 161
               j < slot_offset[slot_offset_vector_[slot_idx] + 1];
               j++) {
162 163
            int shard_id = feasign_v[j] % thread_keys_shard_num_;
            int dim_id = slot_index_vec_[slot_idx];
Y
yaoxuefeng 已提交
164 165 166
            if (feasign_v[j] != 0) {
              this->thread_dim_keys_[i][shard_id][dim_id].insert(feasign_v[j]);
            }
167 168 169 170
          }
        }
      }
    };
Y
yaoxuefeng 已提交
171
    for (int i = 0; i < thread_keys_thread_num_; i++) {
Y
yaoxuefeng 已提交
172
      threads.push_back(
173 174 175 176 177
          std::thread(gen_dynamic_mf_func,
                      std::ref(vec_data),
                      begin,
                      begin + len_per_thread + (i < remain ? 1 : 0),
                      i));
Y
yaoxuefeng 已提交
178

Y
yaoxuefeng 已提交
179
      begin += len_per_thread + (i < remain ? 1 : 0);
Y
yaoxuefeng 已提交
180
    }
Y
yaoxuefeng 已提交
181 182 183 184
    for (std::thread& t : threads) {
      t.join();
    }
    timeline.Pause();
T
Thunderbrook 已提交
185
    VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
Y
yaoxuefeng 已提交
186 187 188
  } else {
    CHECK(data_set_name.find("MultiSlotDataset") != std::string::npos);
    VLOG(0) << "ps_gpu_wrapper use MultiSlotDataset";
D
danleifeng 已提交
189
    MultiSlotDataset* dataset = (MultiSlotDataset*)(dataset_);
Y
yaoxuefeng 已提交
190 191 192 193 194 195 196
    auto input_channel = dataset->GetInputChannel();

    const std::deque<Record>& vec_data = input_channel->GetData();
    total_len = vec_data.size();
    len_per_thread = total_len / thread_keys_thread_num_;
    remain = total_len % thread_keys_thread_num_;
    auto gen_func = [this](const std::deque<Record>& total_data,
197 198 199
                           int begin_index,
                           int end_index,
                           int i) {
Y
yaoxuefeng 已提交
200
      for (auto iter = total_data.begin() + begin_index;
201 202
           iter != total_data.begin() + end_index;
           iter++) {
Y
yaoxuefeng 已提交
203 204 205 206 207 208 209 210 211 212 213
        const auto& ins = *iter;
        const auto& feasign_v = ins.uint64_feasigns_;
        for (const auto feasign : feasign_v) {
          uint64_t cur_key = feasign.sign().uint64_feasign_;
          int shard_id = cur_key % thread_keys_shard_num_;
          this->thread_keys_[i][shard_id].insert(cur_key);
        }
      }
    };
    for (int i = 0; i < thread_keys_thread_num_; i++) {
      threads.push_back(
214 215 216 217 218
          std::thread(gen_func,
                      std::ref(vec_data),
                      begin,
                      begin + len_per_thread + (i < remain ? 1 : 0),
                      i));
Y
yaoxuefeng 已提交
219 220 221 222 223 224
      begin += len_per_thread + (i < remain ? 1 : 0);
    }
    for (std::thread& t : threads) {
      t.join();
    }
    timeline.Pause();
T
Thunderbrook 已提交
225
    VLOG(0) << "GpuPs build task cost " << timeline.ElapsedSec() << " seconds.";
Y
yaoxuefeng 已提交
226 227 228 229
  }

  timeline.Start();

230
  threads.clear();
Y
yaoxuefeng 已提交
231
  // merge thread_keys to shard_keys
232 233
  auto merge_ins_dynamic_mf_func = [this, gpu_task](int shard_num, int dim_id) {
    for (int i = 0; i < thread_keys_thread_num_; ++i) {
234 235
      gpu_task->batch_add_keys(
          shard_num, dim_id, thread_dim_keys_[i][shard_num][dim_id]);
236 237 238
      thread_dim_keys_[i][shard_num][dim_id].clear();
    }
  };
239
  for (int i = 0; i < thread_keys_shard_num_; ++i) {
Y
yaoxuefeng 已提交
240 241
    for (int j = 0; j < multi_mf_dim_; j++) {
      threads.push_back(std::thread(merge_ins_dynamic_mf_func, i, j));
242
    }
243 244 245
  }
  for (auto& t : threads) {
    t.join();
Y
yaoxuefeng 已提交
246 247 248
  }
  timeline.Pause();

249
  VLOG(0) << "GpuPs task add keys cost " << timeline.ElapsedSec()
Y
yaoxuefeng 已提交
250 251 252 253 254
          << " seconds.";
  timeline.Start();
  gpu_task->UniqueKeys();
  timeline.Pause();

255
  VLOG(0) << "GpuPs task unique cost " << timeline.ElapsedSec() << " seconds.";
Y
yaoxuefeng 已提交
256 257 258 259 260 261
  for (int i = 0; i < thread_keys_shard_num_; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
      VLOG(0) << "GpuPs shard: " << i << "mf dim: " << index_dim_vec_[j]
              << " key len: " << gpu_task->feature_dim_keys_[i][j].size();
      gpu_task->value_dim_ptr_[i][j].resize(
          gpu_task->feature_dim_keys_[i][j].size());
262
    }
Y
yaoxuefeng 已提交
263
  }
264 265 266 267
}

void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
  platform::Timer timeline;
T
Thunderbrook 已提交
268
  std::vector<std::future<void>> task_futures;
269 270 271 272
  int device_num = heter_devices_.size();
  auto& local_keys = gpu_task->feature_keys_;
  auto& local_ptr = gpu_task->value_ptr_;

273 274 275
  auto& local_dim_keys = gpu_task->feature_dim_keys_;
  auto& local_dim_ptr = gpu_task->value_dim_ptr_;

276 277
  auto& device_keys = gpu_task->device_keys_;
  auto& device_vals = gpu_task->device_values_;
278 279 280
  auto& device_dim_keys = gpu_task->device_dim_keys_;
  auto& device_dim_ptr = gpu_task->device_dim_ptr_;
  auto& device_dim_mutex = gpu_task->dim_mutex_;
Y
yaoxuefeng 已提交
281 282 283 284

  for (size_t dev = 0; dev < device_dim_keys.size(); dev++) {
    device_dim_keys[dev].resize(multi_mf_dim_);
    device_dim_ptr[dev].resize(multi_mf_dim_);
285
  }
Y
yaoxuefeng 已提交
286

T
Thunderbrook 已提交
287
  // auto& device_mutex = gpu_task->mutex_;
288 289 290 291 292 293

  std::vector<std::thread> threads(thread_keys_shard_num_);
#ifdef PADDLE_WITH_PSLIB
  auto fleet_ptr = FleetWrapper::GetInstance();
#endif
#ifdef PADDLE_WITH_PSCORE
294
  auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance();
295
#endif
296

297
#if (defined PADDLE_WITH_PSLIB) && (defined PADDLE_WITH_HETERPS)
298 299 300 301 302 303 304 305 306 307 308
  // get day_id: day nums from 1970
  struct std::tm b;
  b.tm_year = year_ - 1900;
  b.tm_mon = month_ - 1;
  b.tm_mday = day_;
  b.tm_min = b.tm_hour = b.tm_sec = 0;
  std::time_t seconds_from_1970 = std::mktime(&b);
  int day_id = seconds_from_1970 / 86400;
  fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id);
#endif

309
  timeline.Start();
310

311 312 313 314 315
  auto ptl_dynamic_mf_func =
      [this, &local_dim_keys, &local_dim_ptr, &fleet_ptr](int i, int j) {
        size_t key_size = local_dim_keys[i][j].size();
        int32_t status = -1;
        int32_t cnt = 0;
316
#ifdef PADDLE_WITH_PSLIB
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
        while (true) {
          auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
              i,
              reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
              this->table_id_,
              local_dim_keys[i][j].data(),
              key_size);
          bool flag = true;

          tt.wait();

          try {
            status = tt.get();
          } catch (const std::future_error& e) {
            VLOG(0) << "Caught a future_error with code" << e.code()
                    << ", Message:" << e.what();
          }
          if (status != 0) {
            VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
            sleep(sleep_seconds_before_fail_exit_);
            flag = false;
            cnt++;
          }
          if (cnt > 3) {
            VLOG(0) << "fleet pull sparse failed, retry 3 times";
            exit(-1);
          }
344

345 346 347 348
          if (flag) {
            break;
          }
        }
349 350
#endif
#ifdef PADDLE_WITH_PSCORE
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
        while (true) {
          auto tt = fleet_ptr->worker_ptr_->PullSparsePtr(
              reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
              this->table_id_,
              local_dim_keys[i][j].data(),
              key_size);
          bool flag = true;

          tt.wait();

          try {
            status = tt.get();
          } catch (const std::future_error& e) {
            VLOG(0) << "Caught a future_error with code" << e.code()
                    << ", Message:" << e.what();
          }
          if (status != 0) {
            VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
            sleep(sleep_seconds_before_fail_exit_);
            flag = false;
            cnt++;
          }
          if (cnt > 3) {
            VLOG(0) << "fleet pull sparse failed, retry 3 times";
            exit(-1);
          }
377

378 379 380 381
          if (flag) {
            break;
          }
        }
382
#endif
383 384 385 386 387 388 389 390 391
        if (status != 0) {
          LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
          sleep(300);
          exit(-1);
        } else {
          VLOG(0) << "FleetWrapper Pull sparse to local done with table size: "
                  << local_dim_keys[i][j].size();
        }
      };
Y
yaoxuefeng 已提交
392 393 394 395 396 397

  threads.resize(thread_keys_shard_num_ * multi_mf_dim_);
  for (int i = 0; i < thread_keys_shard_num_; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
      task_futures.emplace_back(
          pull_thread_pool_[i]->enqueue(ptl_dynamic_mf_func, i, j));
398
    }
399
  }
Y
yaoxuefeng 已提交
400 401
  for (auto& f : task_futures) {
    f.wait();
402
  }
Y
yaoxuefeng 已提交
403
  task_futures.clear();
404
  timeline.Pause();
T
Thunderbrook 已提交
405
  VLOG(0) << "pull sparse from CpuPS into GpuPS cost " << timeline.ElapsedSec()
406
          << " seconds.";
Y
yaoxuefeng 已提交
407 408 409 410 411 412 413 414
  if (multi_node_) {
    auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance();
    if (!gloo_wrapper->IsInitialized()) {
      VLOG(0) << "GLOO is not inited";
      gloo_wrapper->Init();
    }
    gloo_wrapper->Barrier();
  }
415 416

  timeline.Start();
Y
yaoxuefeng 已提交
417 418 419
  std::vector<std::vector<std::pair<uint64_t, char*>>> pass_values;

  bool record_status = false;
T
Thunderbrook 已提交
420 421
  auto& device_task_keys = gpu_task->device_task_keys_;
  auto& device_task_ptrs = gpu_task->device_task_ptr_;
422 423 424 425 426
  auto build_pull_dynamic_mf_func = [this,
                                     device_num,
                                     &local_dim_keys,
                                     &local_dim_ptr,
                                     &device_dim_keys,
Y
yaoxuefeng 已提交
427 428
                                     &device_dim_ptr,
                                     &device_dim_mutex](int i, int j) {
429
    std::vector<std::vector<FeatureKey>> task_keys(device_num);
430
#ifdef PADDLE_WITH_PSLIB
431 432
    std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> task_ptrs(
        device_num);
433 434 435 436 437 438
#endif

#ifdef PADDLE_WITH_PSCORE
    std::vector<std::vector<paddle::distributed::FixedFeatureValue*>> task_ptrs(
        device_num);
#endif
439 440 441 442 443
    for (size_t k = 0; k < local_dim_keys[i][j].size(); k++) {
      int shard = local_dim_keys[i][j][k] % device_num;
      task_keys[shard].push_back(local_dim_keys[i][j][k]);
      task_ptrs[shard].push_back(local_dim_ptr[i][j][k]);
    }
Y
yaoxuefeng 已提交
444
    // allocate local keys to devices
445
    for (int dev = 0; dev < device_num; dev++) {
Y
yaoxuefeng 已提交
446 447 448 449 450 451 452 453
      device_dim_mutex[dev][j]->lock();
      int len = task_keys[dev].size();
      int cur = device_dim_keys[dev][j].size();
      device_dim_keys[dev][j].resize(device_dim_keys[dev][j].size() + len);
      device_dim_ptr[dev][j].resize(device_dim_ptr[dev][j].size() + len);
      for (int k = 0; k < len; ++k) {
        device_dim_keys[dev][j][cur + k] = task_keys[dev][k];
        device_dim_ptr[dev][j][cur + k] = task_ptrs[dev][k];
454
      }
Y
yaoxuefeng 已提交
455
      device_dim_mutex[dev][j]->unlock();
456 457
    }
  };
458 459 460 461 462 463 464
  auto build_func = [device_num,
                     record_status,
                     &pass_values,
                     &local_keys,
                     &local_ptr,
                     &device_task_keys,
                     &device_task_ptrs](int i) {
T
Thunderbrook 已提交
465
    auto& task_keys = device_task_keys[i];
T
Thunderbrook 已提交
466
#ifdef PADDLE_WITH_PSLIB
T
Thunderbrook 已提交
467
    auto& task_ptrs = device_task_ptrs[i];
T
Thunderbrook 已提交
468 469 470
#endif

#ifdef PADDLE_WITH_PSCORE
T
Thunderbrook 已提交
471
    auto& task_ptrs = device_task_ptrs[i];
T
Thunderbrook 已提交
472
#endif
473 474 475 476 477 478

    for (size_t j = 0; j < local_keys[i].size(); j++) {
      int shard = local_keys[i][j] % device_num;
      task_keys[shard].push_back(local_keys[i][j]);
      task_ptrs[shard].push_back(local_ptr[i][j]);
    }
479
#ifdef PADDLE_WITH_PSLIB
Y
yaoxuefeng 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
    if (record_status) {
      size_t local_keys_size = local_keys.size();
      size_t pass_values_size = pass_values.size();
      for (size_t j = 0; j < pass_values_size; j += local_keys_size) {
        auto& shard_values = pass_values[j];
        for (size_t pair_idx = 0; pair_idx < pass_values[j].size();
             pair_idx++) {
          auto& cur_pair = shard_values[pair_idx];
          int shard = cur_pair.first % device_num;
          task_keys[shard].push_back(cur_pair.first);
          task_ptrs[shard].push_back(
              (paddle::ps::DownpourFixedFeatureValue*)cur_pair.second);
        }
      }
    }
495
#endif
T
Thunderbrook 已提交
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
  };
  if (!multi_mf_dim_) {
    for (int i = 0; i < thread_keys_shard_num_; i++) {
      task_futures.emplace_back(hbm_thread_pool_[i]->enqueue(build_func, i));
    }
    for (auto& f : task_futures) {
      f.wait();
    }
    task_futures.clear();
    VLOG(0) << "GpuPs build hbmps done";
  }
  std::vector<std::vector<int>> prefix_sum;
  prefix_sum.resize(device_num);
  for (int i = 0; i < device_num; i++) {
    prefix_sum[i].resize(thread_keys_shard_num_ + 1);
    prefix_sum[i][0] = 0;
  }
513 514 515 516
  auto calc_prefix_func = [this,
                           &prefix_sum,
                           &device_keys,
                           &device_vals,
T
Thunderbrook 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
                           &device_task_keys](int device_num) {
    for (int j = 0; j < thread_keys_shard_num_; j++) {
      prefix_sum[device_num][j + 1] =
          prefix_sum[device_num][j] + device_task_keys[j][device_num].size();
    }
    device_keys[device_num].resize(
        prefix_sum[device_num][thread_keys_shard_num_]);
    device_vals[device_num].resize(
        prefix_sum[device_num][thread_keys_shard_num_]);
  };
  if (!multi_mf_dim_) {
    for (int i = 0; i < device_num; i++) {
      task_futures.emplace_back(
          hbm_thread_pool_[i]->enqueue(calc_prefix_func, i));
    }
    for (auto& f : task_futures) {
      f.wait();
    }
    task_futures.clear();
  }
  VLOG(0) << "prefix done";
538 539 540 541 542
  auto prepare_dev_value_func = [device_num,
                                 &prefix_sum,
                                 &device_keys,
                                 &device_vals,
                                 &device_task_keys,
T
Thunderbrook 已提交
543
                                 &device_task_ptrs](int dev, int shard_id) {
D
danleifeng 已提交
544
  // auto& task_keys = device_task_keys[shard_id];
T
Thunderbrook 已提交
545 546 547 548
#ifdef PADDLE_WITH_PSLIB
    auto& task_ptrs = device_task_ptrs[shard_id];
#endif

D
danleifeng 已提交
549 550 551
    // #ifdef PADDLE_WITH_PSCORE
    //     auto& task_ptrs = device_task_ptrs[shard_id];
    // #endif
552

D
danleifeng 已提交
553 554
    // int len = prefix_sum[dev][shard_id + 1] - prefix_sum[dev][shard_id];
    // int cur = prefix_sum[dev][shard_id];
T
Thunderbrook 已提交
555
#ifdef PADDLE_WITH_PSLIB
T
Thunderbrook 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
    for (int j = 0; j < len; ++j) {
      device_keys[dev][cur + j] = task_keys[dev][j];
      float* ptr_val = task_ptrs[dev][j]->data();
      FeatureValue& val = device_vals[dev][cur + j];
      size_t dim = task_ptrs[dev][j]->size();

      val.delta_score = ptr_val[1];
      val.show = ptr_val[2];
      val.clk = ptr_val[3];
      val.slot = ptr_val[6];
      val.lr = ptr_val[4];
      val.lr_g2sum = ptr_val[5];
      val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]);

      if (dim > 7) {
        val.mf_size = MF_DIM + 1;
        for (int x = 0; x < val.mf_size; x++) {
          val.mf[x] = ptr_val[x + 7];
        }
      } else {
        val.mf_size = 0;
        for (int x = 0; x < MF_DIM + 1; x++) {
          val.mf[x] = 0;
Y
yaoxuefeng 已提交
579 580
        }
      }
T
Thunderbrook 已提交
581
    }
T
Thunderbrook 已提交
582
#endif
T
Thunderbrook 已提交
583
    VLOG(3) << "GpuPs build hbmps done";
Y
yaoxuefeng 已提交
584
  };
585

T
Thunderbrook 已提交
586
  if (multi_mf_dim_) {
587 588 589
    for (int i = 0; i < thread_keys_shard_num_; i++) {
      for (int j = 0; j < multi_mf_dim_; j++) {
        threads[i * multi_mf_dim_ + j] =
Y
yaoxuefeng 已提交
590
            std::thread(build_pull_dynamic_mf_func, i, j);
591 592
      }
    }
T
Thunderbrook 已提交
593 594 595 596 597 598 599 600 601 602 603 604 605 606
    for (std::thread& t : threads) {
      t.join();
    }
  } else {
    for (int i = 0; i < thread_keys_shard_num_; i++) {
      for (int j = 0; j < device_num; j++) {
        task_futures.emplace_back(
            hbm_thread_pool_[i]->enqueue(prepare_dev_value_func, j, i));
      }
    }
    for (auto& f : task_futures) {
      f.wait();
    }
    task_futures.clear();
Y
yaoxuefeng 已提交
607 608
  }
  timeline.Pause();
T
Thunderbrook 已提交
609
  VLOG(0) << "GpuPs prepare for build hbm cost " << timeline.ElapsedSec()
610
          << " seconds.";
Y
yaoxuefeng 已提交
611 612
}

613
void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
614
  int device_num = heter_devices_.size();
Y
yaoxuefeng 已提交
615 616
  platform::Timer timeline;
  timeline.Start();
T
Thunderbrook 已提交
617

618
  std::vector<size_t> feature_keys_count(device_num);
T
Thunderbrook 已提交
619
  size_t size_max = 0;
Y
yaoxuefeng 已提交
620 621 622 623 624 625 626

  for (int i = 0; i < device_num; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
      feature_keys_count[i] += gpu_task->device_dim_ptr_[i][j].size();
      VLOG(1) << i << " card with dynamic mf dim: " << index_dim_vec_[j]
              << " dim index: " << j << " contains feasign nums: "
              << gpu_task->device_dim_ptr_[i][j].size();
627
    }
Y
yaoxuefeng 已提交
628 629 630
    VLOG(1) << i << " card with dynamic mf contains feasign nums total: "
            << feature_keys_count[i];
    size_max = std::max(size_max, feature_keys_count[i]);
T
Thunderbrook 已提交
631
  }
Y
yaoxuefeng 已提交
632

T
Thunderbrook 已提交
633
  if (HeterPs_) {
634 635
    delete HeterPs_;
    HeterPs_ = nullptr;
T
Thunderbrook 已提交
636
  }
637
  if (size_max <= 0) {
638
    VLOG(0) << "Skip build gpu ps cause feasign nums = " << size_max;
639 640
    return;
  }
641
  std::vector<std::thread> threads(device_num);
D
danleifeng 已提交
642 643 644 645
  auto accessor_wrapper_ptr =
      GlobalAccessorTransfor::GetInstance().GetAccessorWrapper();
  HeterPs_ = HeterPsBase::get_instance(
      size_max, resource_, fleet_config_, accessor_class_, optimizer_type_);
F
Fan Zhang 已提交
646
#ifdef PADDLE_WITH_CUDA
647
  HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
D
danleifeng 已提交
648 649
  HeterPs_->set_sparse_sgd(optimizer_config_);
  HeterPs_->set_embedx_sgd(optimizer_config_);
F
Fan Zhang 已提交
650
#endif
Z
zmxdream 已提交
651

D
danleifeng 已提交
652 653
  auto build_dymf_mem_pool = [this, &gpu_task, &accessor_wrapper_ptr](int i,
                                                                      int j) {
Y
yaoxuefeng 已提交
654 655
    this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_);
    int mf_dim = this->index_dim_vec_[j];
D
danleifeng 已提交
656 657 658
    VLOG(0) << "building table: " << i << "with mf dim: " << mf_dim
            << " feature_value_size:"
            << accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
Y
yaoxuefeng 已提交
659
    size_t feature_value_size =
D
danleifeng 已提交
660
        accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
Y
yaoxuefeng 已提交
661 662 663 664 665 666
    auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
    auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j];
    size_t len = device_dim_keys.size();
    CHECK(len == device_dim_ptrs.size());
    this->mem_pools_[i * this->multi_mf_dim_ + j] =
        new MemoryPool(len, feature_value_size);
Z
zmxdream 已提交
667
  };
D
danleifeng 已提交
668 669
  auto build_dymf_hbm_pool = [this, &gpu_task, &accessor_wrapper_ptr](int i,
                                                                      int j) {
Z
zmxdream 已提交
670 671 672 673
    auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
    size_t len = device_dim_keys.size();
    int mf_dim = this->index_dim_vec_[j];
    size_t feature_value_size =
D
danleifeng 已提交
674
        accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
Z
zmxdream 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696

    auto& mem_pool = this->mem_pools_[i * this->multi_mf_dim_ + j];
    platform::CUDADeviceGuard guard(resource_->dev_id(i));
    this->hbm_pools_[i * this->multi_mf_dim_ + j] = new HBMMemoryPool(mem_pool);
    auto& cur_pool = this->hbm_pools_[i * this->multi_mf_dim_ + j];

    this->HeterPs_->build_ps(i,
                             device_dim_keys.data(),
                             cur_pool->mem(),
                             len,
                             feature_value_size,
                             500000,
                             2);
    if (device_dim_keys.size() > 0) {
      VLOG(3) << "show table: " << i
              << " table kv size: " << device_dim_keys.size()
              << "dim: " << mf_dim << " len: " << len;
      HeterPs_->show_one_table(i);
    }
    delete mem_pool;
  };
  int thread_num = 16;
D
danleifeng 已提交
697 698 699 700
  auto build_dynamic_mf_func = [this,
                                &gpu_task,
                                thread_num,
                                &accessor_wrapper_ptr](int i, int j, int z) {
Z
zmxdream 已提交
701 702 703 704 705 706 707 708 709
    // this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_);
    int mf_dim = this->index_dim_vec_[j];
    VLOG(0) << "building table: " << i << "with mf dim: " << mf_dim;
    auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
    auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j];
    size_t len = device_dim_keys.size();
    CHECK(len == device_dim_ptrs.size());
    // this->mem_pools_[i * this->multi_mf_dim_ + j] =
    //    new MemoryPool(len, feature_value_size);
Y
yaoxuefeng 已提交
710
    auto& mem_pool = this->mem_pools_[i * this->multi_mf_dim_ + j];
Z
zmxdream 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729

    // ============ add for multi-thread ================
    size_t len_per_thread = len / thread_num;
    size_t remain = len % thread_num;
    size_t left = 0, right = 0;

    size_t real_len = len_per_thread;
    if ((size_t)z < remain) real_len++;

    if ((size_t)z < remain) {
      left = z * (len_per_thread + 1);
      right = left + real_len;
    } else {
      left = remain * (len_per_thread + 1) + (z - remain) * len_per_thread;
      right = left + real_len;
    }
    // ============ add for multi-thread ================

    for (size_t k = left; k < right; k++) {
D
danleifeng 已提交
730 731
#ifdef PADDLE_WITH_PSLIB
      float* val = (float*)(mem_pool->mem_address(k));
Y
yaoxuefeng 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
      float* ptr_val = device_dim_ptrs[k]->data();
      size_t dim = device_dim_ptrs[k]->size();
      val->delta_score =
          ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                      DownpourCtrDymfFeatureValue::delta_score_index()];
      val->show = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                              DownpourCtrDymfFeatureValue::show_index()];
      val->clk = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                             DownpourCtrDymfFeatureValue::click_index()];
      val->slot = int(ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                                  DownpourCtrDymfFeatureValue::slot_index()]);
      val->lr = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                            DownpourCtrDymfFeatureValue::embed_w_index()];
      val->lr_g2sum =
          ptr_val[paddle::ps::DownpourCtrDymfAccessor::
                      DownpourCtrDymfFeatureValue::embed_g2sum_index()];
Y
yaoxuefeng 已提交
748
      // TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor
Y
yaoxuefeng 已提交
749 750 751 752 753 754 755 756 757 758 759 760 761 762
      ptr_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  mf_dim_index()] = float(mf_dim);
      val->mf_dim = mf_dim;
      if (dim > 8) {  // CpuPS alreay expand as mf_dim
        val->mf_size = mf_dim + 1;
        for (int x = 0; x < val->mf_dim + 1; x++) {
          val->mf[x] = ptr_val[x + 8];
        }
      } else {
        val->mf_size = 0;
        for (int x = 0; x < val->mf_dim + 1; x++) {
          val->mf[x] = 0;
        }
      }
D
danleifeng 已提交
763 764 765 766 767 768
#endif
#ifdef PADDLE_WITH_PSCORE
      void* val = mem_pool->mem_address(k);
      accessor_wrapper_ptr->BuildFill(
          val, device_dim_ptrs[k], cpu_table_accessor_, mf_dim);
#endif
Y
yaoxuefeng 已提交
769
    }
Z
zmxdream 已提交
770
  };
Y
yaoxuefeng 已提交
771

Z
zmxdream 已提交
772 773 774 775 776 777
  threads.resize(device_num * multi_mf_dim_);
  for (int i = 0; i < device_num; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
      threads[i + j * device_num] = std::thread(build_dymf_mem_pool, i, j);
    }
  }
Y
yaoxuefeng 已提交
778

Z
zmxdream 已提交
779 780 781 782
  for (std::thread& t : threads) {
    t.join();
  }
  threads.clear();
Y
yaoxuefeng 已提交
783

Z
zmxdream 已提交
784 785 786 787 788 789 790 791
  // multi-thread process
  threads.resize(device_num * multi_mf_dim_ * thread_num);
  for (int i = 0; i < device_num; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
      for (int k = 0; k < thread_num; k++) {
        threads[(i + j * device_num) * thread_num + k] =
            std::thread(build_dynamic_mf_func, i, j, k);
      }
Y
yaoxuefeng 已提交
792
    }
Z
zmxdream 已提交
793 794 795 796 797
  }
  for (std::thread& t : threads) {
    t.join();
  }
  threads.clear();
Y
yaoxuefeng 已提交
798 799 800
  threads.resize(device_num * multi_mf_dim_);
  for (int i = 0; i < device_num; i++) {
    for (int j = 0; j < multi_mf_dim_; j++) {
Z
zmxdream 已提交
801
      threads[i + j * device_num] = std::thread(build_dymf_hbm_pool, i, j);
Y
yaoxuefeng 已提交
802
    }
Y
yaoxuefeng 已提交
803 804 805
  }
  for (std::thread& t : threads) {
    t.join();
T
Thunderbrook 已提交
806
  }
Z
zmxdream 已提交
807 808
  threads.clear();

T
Thunderbrook 已提交
809
  timeline.Pause();
810
  VLOG(0) << "GpuPs build table total costs: " << timeline.ElapsedSec()
T
Thunderbrook 已提交
811
          << " s.";
812 813 814 815 816 817 818 819 820 821 822 823 824 825
}

void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) {
  platform::Timer timer;
  VLOG(3) << "Begin LoadIntoMemory(), dataset[" << dataset_ << "]";
  timer.Start();
  dataset_->LoadIntoMemory();
  timer.Pause();
  VLOG(0) << "LoadIntoMemory cost: " << timer.ElapsedSec() << "s";

  // local shuffle
  if (is_shuffle) {
    dataset_->LocalShuffle();
  }
Y
yaoxuefeng 已提交
826
  InitSlotInfo();
827 828
  std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
  gpu_task->Reset();
Y
yaoxuefeng 已提交
829

830
  data_ready_channel_->Put(gpu_task);
Y
yaoxuefeng 已提交
831

832 833 834 835 836
  VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]";
}

void PSGPUWrapper::start_build_thread() {
  running_ = true;
837
  VLOG(3) << "start build CPU ps thread.";
838
  pre_build_threads_ = std::thread([this] { pre_build_thread(); });
839 840
}

841 842
void PSGPUWrapper::pre_build_thread() {
  // prebuild: process load_data
843 844 845 846 847
  while (running_) {
    std::shared_ptr<HeterContext> gpu_task = nullptr;
    if (!data_ready_channel_->Get(gpu_task)) {
      continue;
    }
848
    VLOG(3) << "thread PreBuildTask start.";
849 850 851
    platform::Timer timer;
    timer.Start();
    // build cpu ps data process
852
    PreBuildTask(gpu_task);
853
    timer.Pause();
854
    VLOG(0) << "thread PreBuildTask end, cost time: " << timer.ElapsedSec()
T
Thunderbrook 已提交
855
            << " s";
856 857 858 859 860
    buildcpu_ready_channel_->Put(gpu_task);
  }
  VLOG(3) << "build cpu thread end";
}

861 862 863 864 865 866 867 868 869 870
void PSGPUWrapper::build_task() {
  // build_task: build_pull + build_gputask
  std::shared_ptr<HeterContext> gpu_task = nullptr;
  // train end, gpu free
  if (!gpu_free_channel_->Get(gpu_task)) {
    return;
  }
  // ins and pre_build end
  if (!buildcpu_ready_channel_->Get(gpu_task)) {
    return;
871
  }
872

873
  VLOG(0) << "BuildPull start.";
874 875 876 877 878
  platform::Timer timer;
  timer.Start();
  BuildPull(gpu_task);
  BuildGPUTask(gpu_task);
  timer.Pause();
879
  VLOG(0) << "BuildPull + BuildGPUTask end, cost time: " << timer.ElapsedSec()
880 881 882
          << "s";

  current_task_ = gpu_task;
883 884 885 886 887 888 889 890 891
}

void PSGPUWrapper::BeginPass() {
  platform::Timer timer;
  timer.Start();
  if (current_task_) {
    PADDLE_THROW(
        platform::errors::Fatal("[BeginPass] current task is not ended."));
  }
892 893

  build_task();
894
  timer.Pause();
895 896 897 898 899 900

  if (current_task_ == nullptr) {
    PADDLE_THROW(platform::errors::Fatal(
        "[BeginPass] after build_task, current task is not null."));
  }

T
Thunderbrook 已提交
901
  VLOG(0) << "BeginPass end, cost time: " << timer.ElapsedSec() << "s";
902 903 904 905 906 907 908 909 910 911 912
}

void PSGPUWrapper::EndPass() {
  if (!current_task_) {
    PADDLE_THROW(
        platform::errors::Fatal("[EndPass] current task has been ended."));
  }
  platform::Timer timer;
  timer.Start();
  size_t keysize_max = 0;
  // in case of feasign_num = 0, skip dump_to_cpu
Y
yaoxuefeng 已提交
913

914
  for (size_t i = 0; i < heter_devices_.size(); i++) {
Y
yaoxuefeng 已提交
915 916 917 918 919
    for (int j = 0; j < multi_mf_dim_; j++) {
      keysize_max =
          std::max(keysize_max, current_task_->device_dim_keys_[i][j].size());
    }
  }
920
  int thread_num = 8;
D
danleifeng 已提交
921 922 923 924
  auto accessor_wrapper_ptr =
      GlobalAccessorTransfor::GetInstance().GetAccessorWrapper();
  auto dump_pool_to_cpu_func = [this, thread_num, &accessor_wrapper_ptr](
                                   int i, int j, int z) {
Y
yaoxuefeng 已提交
925 926 927 928
    PADDLE_ENFORCE_GPU_SUCCESS(cudaSetDevice(this->resource_->dev_id(i)));
    auto& hbm_pool = this->hbm_pools_[i * this->multi_mf_dim_ + j];
    auto& device_keys = this->current_task_->device_dim_keys_[i][j];
    size_t len = device_keys.size();
929 930 931 932 933 934 935 936 937 938 939 940 941 942
    // ====== multi-thread process feasign================
    int len_per_thread = len / thread_num;
    int remain = len % thread_num;
    int left = -1, right = -1;
    int real_len = len_per_thread;
    if (z < remain) real_len++;
    if (z < remain) {
      left = z * (len_per_thread + 1);
      right = left + real_len;
    } else {
      left = remain * (len_per_thread + 1) + (z - remain) * len_per_thread;
      right = left + real_len;
    }
    // ============ multi-thread process feasign============
Y
yaoxuefeng 已提交
943 944
    int mf_dim = this->index_dim_vec_[j];
    size_t feature_value_size =
D
danleifeng 已提交
945 946 947 948
        accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
    VLOG(0) << "dump pool to cpu table: " << i << "with mf dim: " << mf_dim
            << " key_len :" << len
            << " feature_value_size:" << feature_value_size;
949 950
    char* test_build_values = (char*)malloc(feature_value_size * real_len);
    uint64_t offset = left * feature_value_size;
951 952 953 954
    cudaMemcpy(test_build_values,
               hbm_pool->mem() + offset,
               feature_value_size * real_len,
               cudaMemcpyDeviceToHost);
Y
yaoxuefeng 已提交
955 956
    CHECK(len == hbm_pool->capacity());
    uint64_t unuse_key = std::numeric_limits<uint64_t>::max();
957
    for (int i = left; i < right; ++i) {
Y
yaoxuefeng 已提交
958 959 960
      if (device_keys[i] == unuse_key) {
        continue;
      }
961
      size_t local_offset = (i - left) * feature_value_size;
D
danleifeng 已提交
962
      float* gpu_val = (float*)(test_build_values + local_offset);
963
#ifdef PADDLE_WITH_PSLIB
Y
yaoxuefeng 已提交
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
      auto* downpour_value =
          (paddle::ps::DownpourFixedFeatureValue*)(gpu_val->cpu_ptr);
      int downpour_value_size = downpour_value->size();
      if (gpu_val->mf_size > 0 && downpour_value_size == 8) {
        downpour_value->resize(gpu_val->mf_dim + 1 + downpour_value_size);
      }
      float* cpu_val = downpour_value->data();
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  delta_score_index()] = gpu_val->delta_score;
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  show_index()] = gpu_val->show;
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  click_index()] = gpu_val->clk;
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  embed_w_index()] = gpu_val->lr;
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  embed_g2sum_index()] = gpu_val->lr_g2sum;
      cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
                  slot_index()] = gpu_val->slot;
      if (gpu_val->mf_size > 0) {
        for (int x = 0; x < gpu_val->mf_dim + 1; x++) {
          cpu_val[x + 8] = gpu_val->mf[x];
        }
      }
D
danleifeng 已提交
988 989 990 991
#endif
#ifdef PADDLE_WITH_PSCORE
      accessor_wrapper_ptr->DumpFill(gpu_val, cpu_table_accessor_, mf_dim);
#endif
Y
yaoxuefeng 已提交
992 993 994 995 996 997
    }
    free(test_build_values);
  };
  if (multi_mf_dim_) {
    VLOG(0) << "psgpu wrapper dump pool: multi_mf_dim_: " << multi_mf_dim_;
    size_t device_num = heter_devices_.size();
998
    std::vector<std::thread> threads(device_num * multi_mf_dim_ * thread_num);
Y
yaoxuefeng 已提交
999 1000
    for (size_t i = 0; i < device_num; i++) {
      for (int j = 0; j < multi_mf_dim_; j++) {
1001 1002 1003 1004
        for (int k = 0; k < thread_num; k++) {
          threads[(i + j * device_num) * thread_num + k] =
              std::thread(dump_pool_to_cpu_func, i, j, k);
        }
Y
yaoxuefeng 已提交
1005 1006 1007 1008 1009
      }
    }
    for (std::thread& t : threads) {
      t.join();
    }
1010 1011 1012 1013
  }
  if (keysize_max != 0) {
    HeterPs_->end_pass();
  }
1014

Y
yaoxuefeng 已提交
1015 1016 1017
  for (size_t i = 0; i < hbm_pools_.size(); i++) {
    delete hbm_pools_[i];
  }
1018
  gpu_task_pool_.Push(current_task_);
1019 1020 1021
  current_task_ = nullptr;
  gpu_free_channel_->Put(current_task_);
  timer.Pause();
Y
yaoxuefeng 已提交
1022
  VLOG(1) << "EndPass end, cost time: " << timer.ElapsedSec() << "s";
T
Thunderbrook 已提交
1023 1024 1025 1026 1027 1028 1029 1030
}

void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
                              const int table_id,
                              const std::vector<const uint64_t*>& keys,
                              const std::vector<float*>& values,
                              const std::vector<int64_t>& slot_lengths,
                              const int hidden_size) {
D
danleifeng 已提交
1031 1032
  VLOG(0) << "Warning:: recommand use pull_gpups_sparse op instead. This "
             "PullSparse is not used.";
Y
yaoxuefeng 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
}

void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
                              const int table_id,
                              const std::vector<const uint64_t*>& keys,
                              const std::vector<float*>& values,
                              const std::vector<int64_t>& slot_lengths,
                              const std::vector<int>& slot_dim,
                              const int hidden_size) {
  VLOG(3) << "Begine Gpu Ps PullSparse";
  platform::Timer all_timer;
  platform::Timer pull_gpups_timer;
  all_timer.Start();
  size_t total_length =
      std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL);
  size_t feature_value_size = 0;

D
danleifeng 已提交
1050 1051 1052 1053 1054
  auto accessor_wrapper_ptr =
      GlobalAccessorTransfor::GetInstance().GetAccessorWrapper();
  feature_value_size = accessor_wrapper_ptr->GetFeatureValueSize(max_mf_dim_);
  VLOG(3) << "PullSparse max_dim:" << max_mf_dim_
          << " feature_value_size:" << feature_value_size;
Y
yaoxuefeng 已提交
1055 1056 1057 1058

#ifdef PADDLE_WITH_CUDA
  VLOG(3) << "Begine Gpu Ps PullSparse";
  auto buf = memory::Alloc(place, total_length * feature_value_size);
D
danleifeng 已提交
1059
  float* total_values_gpu = reinterpret_cast<float*>(buf->ptr());
Y
yaoxuefeng 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
#endif
#ifdef PADDLE_WITH_XPU_KP
  VLOG(3) << "Begine Xpu Ps PullSparse";
  FeatureValue* total_values_gpu = nullptr;
  xpu_malloc(reinterpret_cast<void**>(&total_values_gpu),
             total_length * feature_value_size);
#endif
  if (platform::is_cpu_place(place)) {
    PADDLE_THROW(platform::errors::Unimplemented(
        "Warning:: CPUPlace is not supported in GpuPs now."));
  } else if (platform::is_gpu_place(place)) {
    VLOG(3) << "Begin copy keys, key_num[" << total_length << "]";
    int device_id = place.GetDeviceId();
    int devid_2_index = HeterPs_->get_index_by_devid(device_id);
    LoDTensor& total_keys_tensor = keys_tensor[devid_2_index];
    uint64_t* total_keys =
        reinterpret_cast<uint64_t*>(total_keys_tensor.mutable_data<int64_t>(
            {int64_t(total_length), 1}, place));

    // construct slot_level lod info
    auto slot_lengths_lod = slot_lengths;
    for (size_t i = 1; i < slot_lengths_lod.size(); i++) {
      slot_lengths_lod[i] += slot_lengths_lod[i - 1];
    }
    auto buf_key = memory::Alloc(place, keys.size() * sizeof(uint64_t*));
    auto buf_length =
        memory::Alloc(place, slot_lengths.size() * sizeof(int64_t));
    uint64_t** gpu_keys = reinterpret_cast<uint64_t**>(buf_key->ptr());
    int64_t* gpu_len = reinterpret_cast<int64_t*>(buf_length->ptr());
1089 1090 1091 1092 1093 1094 1095
    cudaMemcpy(gpu_keys,
               keys.data(),
               keys.size() * sizeof(uint64_t*),
               cudaMemcpyHostToDevice);
    cudaMemcpy(gpu_len,
               slot_lengths_lod.data(),
               slot_lengths.size() * sizeof(int64_t),
Y
yaoxuefeng 已提交
1096 1097 1098 1099
               cudaMemcpyHostToDevice);

    auto buf_dim = memory::Alloc(place, slot_dim.size() * sizeof(int));
    int* gpu_dim = reinterpret_cast<int*>(buf_dim->ptr());
1100 1101 1102
    cudaMemcpy(gpu_dim,
               slot_dim.data(),
               slot_dim.size() * sizeof(int),
Y
yaoxuefeng 已提交
1103 1104
               cudaMemcpyHostToDevice);

1105 1106 1107 1108
    this->CopyKeys(place,
                   gpu_keys,
                   total_keys,
                   gpu_len,
Y
yaoxuefeng 已提交
1109 1110 1111 1112 1113 1114
                   static_cast<int>(slot_lengths.size()),
                   static_cast<int>(total_length));
    VLOG(3) << "Begin call PullSparseGPU in GPUPS, dev: " << devid_2_index
            << " len: " << total_length;

    pull_gpups_timer.Start();
1115 1116
    HeterPs_->pull_sparse(
        devid_2_index, total_keys, total_values_gpu, total_length);
Y
yaoxuefeng 已提交
1117 1118 1119 1120

    VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length
            << "]";

D
danleifeng 已提交
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130
    accessor_wrapper_ptr->CopyForPull(place,
                                      gpu_keys,
                                      values,
                                      total_values_gpu,
                                      gpu_len,
                                      static_cast<int>(slot_lengths.size()),
                                      hidden_size,
                                      total_length,
                                      gpu_dim,
                                      val_type_size_);
Y
yaoxuefeng 已提交
1131 1132 1133

    pull_gpups_timer.Pause();

F
Fan Zhang 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
  } else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU_KP
    VLOG(3) << "Begin copy keys, key_num[" << total_length << "]";
    int device_id = place.GetDeviceId();
    int devid_2_index = HeterPs_->get_index_by_devid(device_id);
    LoDTensor& total_keys_tensor = keys_tensor[devid_2_index];
    uint64_t* total_keys = reinterpret_cast<uint64_t*>(
        total_keys_tensor.mutable_data<int64_t>({total_length, 1}, place));

    // construct slot_level lod info
    auto slot_lengths_lod = slot_lengths;
    for (size_t i = 1; i < slot_lengths_lod.size(); i++) {
      slot_lengths_lod[i] += slot_lengths_lod[i - 1];
    }

F
Fan Zhang 已提交
1149 1150 1151 1152 1153
    auto buf_key = memory::Alloc(place, keys.size() * sizeof(uint64_t*));
    auto buf_length =
        memory::Alloc(place, slot_lengths.size() * sizeof(int64_t));
    uint64_t** xpu_keys = reinterpret_cast<uint64_t**>(buf_key->ptr());
    int64_t* xpu_len = reinterpret_cast<int64_t*>(buf_length->ptr());
1154 1155
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_keys,
                                          keys.data(),
F
Fan Zhang 已提交
1156 1157
                                          keys.size() * sizeof(uint64_t*),
                                          XPU_HOST_TO_DEVICE));
1158 1159
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_len,
                                          slot_lengths_lod.data(),
F
Fan Zhang 已提交
1160 1161 1162
                                          slot_lengths.size() * sizeof(int64_t),
                                          XPU_HOST_TO_DEVICE));

1163 1164 1165 1166
    this->CopyKeys(place,
                   xpu_keys,
                   total_keys,
                   xpu_len,
F
Fan Zhang 已提交
1167 1168 1169 1170 1171
                   static_cast<int>(slot_lengths.size()),
                   static_cast<int>(total_length));
    VLOG(3) << "Begin call PullSparseGPU in GPUPS, dev: " << devid_2_index
            << " len: " << total_length;
    pull_gpups_timer.Start();
1172 1173 1174
    HeterPs_->pull_sparse(devid_2_index,
                          total_keys,
                          total_values_gpu,
F
Fan Zhang 已提交
1175 1176 1177 1178 1179
                          static_cast<int>(total_length));
    pull_gpups_timer.Pause();

    VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length
            << "]";
D
danleifeng 已提交
1180 1181 1182 1183 1184 1185 1186 1187 1188
    accessor_wrapper_ptr->CopyForPull(place,
                                      xpu_keys,
                                      values,
                                      total_values_gpu,
                                      xpu_len,
                                      static_cast<int>(slot_lengths.size()),
                                      hidden_size,
                                      total_length,
                                      val_type_size_);
F
Fan Zhang 已提交
1189
#endif
T
Thunderbrook 已提交
1190 1191
  } else {
    PADDLE_THROW(platform::errors::PreconditionNotMet(
F
Fan Zhang 已提交
1192
        "GpuPs/XpuPs: PullSparse Only Support CUDAPlace or XPUPlace Now."));
T
Thunderbrook 已提交
1193 1194
  }
  all_timer.Pause();
1195
  VLOG(3) << "GpuPs PullSparse total costs: " << all_timer.ElapsedSec()
T
Thunderbrook 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
          << " s, of which GPUPS costs: " << pull_gpups_timer.ElapsedSec()
          << " s";
  VLOG(3) << "End PullSparse";
}

void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place,
                                  const int table_id,
                                  const std::vector<const uint64_t*>& keys,
                                  const std::vector<const float*>& grad_values,
                                  const std::vector<int64_t>& slot_lengths,
1206 1207
                                  const int hidden_size,
                                  const int batch_size) {
T
Thunderbrook 已提交
1208 1209 1210 1211 1212
  platform::Timer all_timer;
  platform::Timer push_gpups_timer;
  all_timer.Start();
  int64_t total_length =
      std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL);
F
Fan Zhang 已提交
1213
  // #ifdef PADDLE_WITH_CUDA
F
Fan Zhang 已提交
1214
  VLOG(3) << "Begin GPUPS PushSparseGrad";
D
danleifeng 已提交
1215 1216 1217
  auto accessor_wrapper_ptr =
      GlobalAccessorTransfor::GetInstance().GetAccessorWrapper();
  size_t grad_value_size = accessor_wrapper_ptr->GetPushValueSize(max_mf_dim_);
Y
yaoxuefeng 已提交
1218
  auto buf = memory::Alloc(place, total_length * grad_value_size);
D
danleifeng 已提交
1219 1220 1221
  VLOG(3) << "Push Sparse Max mf dimention: " << max_mf_dim_
          << "grad_value_size:" << grad_value_size;
  float* total_grad_values_gpu = reinterpret_cast<float*>(buf->ptr());
T
Thunderbrook 已提交
1222 1223 1224 1225
  if (platform::is_cpu_place(place)) {
    PADDLE_THROW(platform::errors::Unimplemented(
        "Warning:: CPUPlace is not supported in GPUPS now."));
  } else if (platform::is_gpu_place(place)) {
F
Fan Zhang 已提交
1226
#ifdef PADDLE_WITH_CUDA
1227
    int device_id = place.GetDeviceId();
T
Thunderbrook 已提交
1228 1229 1230 1231 1232
    int devid_2_index = HeterPs_->get_index_by_devid(device_id);
    LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index];
    uint64_t* total_keys =
        reinterpret_cast<uint64_t*>(cached_total_keys_tensor.data<int64_t>());
    VLOG(3) << "Begin copy grad tensor to gpups struct";
D
danleifeng 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241
    accessor_wrapper_ptr->CopyForPush(place,
                                      grad_values,
                                      total_grad_values_gpu,
                                      slot_lengths,
                                      total_length,
                                      batch_size,
                                      grad_value_size,
                                      slot_vector_,
                                      slot_mf_dim_vector_);
T
Thunderbrook 已提交
1242 1243 1244 1245

    VLOG(3) << "Begin call PushSparseGPU in GPUPS, dev: " << devid_2_index
            << " len: " << total_length;
    push_gpups_timer.Start();
1246 1247 1248
    HeterPs_->push_sparse(devid_2_index,
                          total_keys,
                          total_grad_values_gpu,
T
Thunderbrook 已提交
1249 1250
                          static_cast<int>(total_length));
    push_gpups_timer.Pause();
F
Fan Zhang 已提交
1251
#endif
F
Fan Zhang 已提交
1252
  } else if (platform::is_xpu_place(place)) {
F
Fan Zhang 已提交
1253
#ifdef PADDLE_WITH_XPU_KP
F
Fan Zhang 已提交
1254 1255 1256 1257 1258 1259
    int device_id = place.GetDeviceId();
    int devid_2_index = HeterPs_->get_index_by_devid(device_id);
    LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index];
    uint64_t* total_keys =
        reinterpret_cast<uint64_t*>(cached_total_keys_tensor.data<int64_t>());
    VLOG(3) << "Begin copy grad tensor to xpups struct";
D
danleifeng 已提交
1260 1261 1262 1263 1264 1265 1266 1267
    accessor_wrapper_ptr->CopyForPush(place,
                                      grad_values,
                                      total_grad_values_gpu,
                                      slot_lengths,
                                      hidden_size,
                                      total_length,
                                      batch_size,
                                      slot_vector_);
F
Fan Zhang 已提交
1268 1269 1270 1271

    VLOG(3) << "Begin call PushSparseXPU in XPUPS, dev: " << devid_2_index
            << " len: " << total_length;
    push_gpups_timer.Start();
1272 1273 1274
    HeterPs_->push_sparse(devid_2_index,
                          total_keys,
                          total_grad_values_gpu,
F
Fan Zhang 已提交
1275 1276
                          static_cast<int>(total_length));
    push_gpups_timer.Pause();
F
Fan Zhang 已提交
1277
#endif
T
Thunderbrook 已提交
1278 1279 1280 1281 1282
  } else {
    PADDLE_THROW(platform::errors::PreconditionNotMet(
        "GPUPS: PushSparseGrad Only Support CUDAPlace Now."));
  }
  all_timer.Pause();
Y
yaoxuefeng 已提交
1283 1284
  time_3 += all_timer.ElapsedSec();
  time_4 += push_gpups_timer.ElapsedSec();
1285
  VLOG(3) << "PushSparseGrad total cost: " << all_timer.ElapsedSec()
T
Thunderbrook 已提交
1286 1287 1288 1289 1290 1291 1292 1293
          << " s, of which GPUPS cost: " << push_gpups_timer.ElapsedSec()
          << " s";
  VLOG(3) << "End PushSparseGrad";
}

}  // end namespace framework
}  // end namespace paddle
#endif