ps_gpu_wrapper.h 26.8 KB
Newer Older
T
Thunderbrook 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
T
Thunderbrook 已提交
16
#ifdef PADDLE_WITH_HETERPS
T
Thunderbrook 已提交
17 18 19 20 21

#include <atomic>
#include <ctime>
#include <map>
#include <memory>
22
#include <mutex>
T
Thunderbrook 已提交
23 24 25
#include <random>
#include <string>
#include <unordered_map>
Y
yaoxuefeng 已提交
26
#include <unordered_set>
T
Thunderbrook 已提交
27
#include <vector>
28 29
#ifdef PADDLE_WITH_GLOO
#include <gloo/broadcast.h>
30

Y
yaoxuefeng 已提交
31
#include "paddle/fluid/framework/data_set.h"
32 33
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#endif
34
#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h"
F
Fan Zhang 已提交
35
#include "paddle/fluid/framework/channel.h"
T
Thunderbrook 已提交
36 37 38
#include "paddle/fluid/framework/fleet/heter_context.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
F
Fan Zhang 已提交
39 40
#include "paddle/fluid/framework/heter_util.h"
#ifdef PADDLE_WITH_CUDA
Y
yaoxuefeng 已提交
41
#include "paddle/fluid/framework/fleet/heter_ps/mem_pool.h"
F
Fan Zhang 已提交
42 43 44 45 46 47
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/dynload/nccl.h"
#endif
#ifdef PADDLE_WITH_XPU_KP
#include "paddle/fluid/platform/device/xpu/enforce_xpu.h"
#endif
T
Thunderbrook 已提交
48 49 50 51 52
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/macros.h"  // for DISABLE_COPY_AND_ASSIGN
#include "paddle/fluid/platform/place.h"
T
Thunderbrook 已提交
53
#ifdef PADDLE_WITH_PSCORE
D
danleifeng 已提交
54 55
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h"
56
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
D
danleifeng 已提交
57
#include "paddle/fluid/distributed/the_one_ps.pb.h"
T
Thunderbrook 已提交
58
#endif
T
Thunderbrook 已提交
59
#ifdef PADDLE_WITH_PSLIB
Z
zmxdream 已提交
60
#include "afs_api.h"  // NOLINT
T
Thunderbrook 已提交
61
#endif
Y
yaoxuefeng 已提交
62 63 64
#ifdef PADDLE_WITH_PSLIB
#include "downpour_accessor.h"  // NOLINT
#endif
65
#include "paddle/fluid/framework/fleet/heter_ps/log_patch.h"
T
Thunderbrook 已提交
66 67 68 69

namespace paddle {
namespace framework {

F
Fan Zhang 已提交
70 71
class Dataset;

T
Thunderbrook 已提交
72 73 74 75 76
#ifdef PADDLE_WITH_PSLIB
class AfsWrapper {
 public:
  AfsWrapper() {}
  virtual ~AfsWrapper() {}
77 78 79 80
  void init(const std::string& fs_name,
            const std::string& fs_user,
            const std::string& pass_wd,
            const std::string& conf);
T
Thunderbrook 已提交
81 82 83 84 85 86 87 88 89
  int remove(const std::string& path);
  int mkdir(const std::string& path);
  std::vector<std::string> list(const std::string& path);

  int exist(const std::string& path);
  int upload(const std::string& local_file, const std::string& afs_file);

  int download(const std::string& local_file, const std::string& afs_file);

90 91 92 93
  int touchz(const std::string& path);
  std::string cat(const std::string& path);
  int mv(const std::string& old_path, const std::string& dest_path);

T
Thunderbrook 已提交
94 95 96 97 98
 private:
  paddle::ps::AfsApiWrapper afs_handler_;
};
#endif

T
Thunderbrook 已提交
99
class PSGPUWrapper {
D
danleifeng 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
  class DCacheBuffer {
   public:
    DCacheBuffer() : buf_(nullptr) {}
    ~DCacheBuffer() {}
    /**
     * @Brief get data
     */
    template <typename T>
    T* mutable_data(const size_t total_bytes,
                    const paddle::platform::Place& place) {
      if (buf_ == nullptr) {
        buf_ = memory::AllocShared(place, total_bytes);
      } else if (buf_->size() < total_bytes) {
        buf_.reset();
        buf_ = memory::AllocShared(place, total_bytes);
      }
      return reinterpret_cast<T*>(buf_->ptr());
    }
    template <typename T>
    T* data() {
      return reinterpret_cast<T*>(buf_->ptr());
    }
    size_t memory_size() {
      if (buf_ == nullptr) {
        return 0;
      }
      return buf_->size();
    }
    bool IsInitialized(void) { return (buf_ != nullptr); }

   private:
    std::shared_ptr<memory::Allocation> buf_ = nullptr;
  };
  struct PSDeviceData {
    DCacheBuffer keys_tensor;
    DCacheBuffer dims_tensor;
    DCacheBuffer keys_ptr_tensor;
    DCacheBuffer values_ptr_tensor;
    DCacheBuffer pull_push_tensor;

    DCacheBuffer slot_lens;
    DCacheBuffer d_slot_vector;
    DCacheBuffer keys2slot;

    int64_t total_key_length = 0;
    int64_t dedup_key_length = 0;
  };
  PSDeviceData* device_caches_ = nullptr;

T
Thunderbrook 已提交
149
 public:
D
danleifeng 已提交
150
  ~PSGPUWrapper();
T
Thunderbrook 已提交
151 152 153 154 155 156

  PSGPUWrapper() {
    HeterPs_ = NULL;
    sleep_seconds_before_fail_exit_ = 300;
  }

157 158
  void PullSparse(const paddle::platform::Place& place,
                  const int table_id,
Y
yaoxuefeng 已提交
159 160 161
                  const std::vector<const uint64_t*>& keys,
                  const std::vector<float*>& values,
                  const std::vector<int64_t>& slot_lengths,
162 163 164 165
                  const std::vector<int>& slot_dim,
                  const int hidden_size);
  void PullSparse(const paddle::platform::Place& place,
                  const int table_id,
T
Thunderbrook 已提交
166 167 168 169
                  const std::vector<const uint64_t*>& keys,
                  const std::vector<float*>& values,
                  const std::vector<int64_t>& slot_lengths,
                  const int hidden_size);
170 171
  void PushSparseGrad(const paddle::platform::Place& place,
                      const int table_id,
T
Thunderbrook 已提交
172 173 174
                      const std::vector<const uint64_t*>& keys,
                      const std::vector<const float*>& grad_values,
                      const std::vector<int64_t>& slot_lengths,
175 176 177 178 179 180 181
                      const int hidden_size,
                      const int batch_size);
  void CopyKeys(const paddle::platform::Place& place,
                uint64_t** origin_keys,
                uint64_t* total_keys,
                const int64_t* gpu_len,
                int slot_num,
T
Thunderbrook 已提交
182
                int total_len);
D
danleifeng 已提交
183 184 185 186 187 188 189
  void CopyKeys(const paddle::platform::Place& place,
                uint64_t** origin_keys,
                uint64_t* total_keys,
                const int64_t* gpu_len,
                int slot_num,
                int total_len,
                int* key2slot);
T
Thunderbrook 已提交
190

191
  void BuildGPUTask(std::shared_ptr<HeterContext> gpu_task);
192 193
  void PreBuildTask(std::shared_ptr<HeterContext> gpu_task);
  void BuildPull(std::shared_ptr<HeterContext> gpu_task);
194 195 196 197
  void LoadIntoMemory(bool is_shuffle);
  void BeginPass();
  void EndPass();
  void start_build_thread();
198
  void pre_build_thread();
199
  void build_task();
200 201 202 203 204 205 206 207 208 209

  void Finalize() {
    VLOG(3) << "PSGPUWrapper Begin Finalize.";
    if (s_instance_ == nullptr) {
      return;
    }
    data_ready_channel_->Close();
    buildcpu_ready_channel_->Close();
    gpu_free_channel_->Close();
    running_ = false;
210 211
    VLOG(3) << "begin stop pre_build_threads_";
    pre_build_threads_.join();
212 213
    s_instance_ = nullptr;
    VLOG(3) << "PSGPUWrapper Finalize Finished.";
D
danleifeng 已提交
214 215 216 217 218
    HeterPs_->show_table_collisions();
    if (device_caches_ != nullptr) {
      delete[] device_caches_;
      device_caches_ = nullptr;
    }
219 220
  }

T
Thunderbrook 已提交
221
  void InitializeGPU(const std::vector<int>& dev_ids) {
222
    if (s_instance_ != NULL && is_initialized_ == false) {
T
Thunderbrook 已提交
223
      VLOG(3) << "PSGPUWrapper Begin InitializeGPU";
224
      is_initialized_ = true;
T
Thunderbrook 已提交
225 226
      resource_ = std::make_shared<HeterPsResource>(dev_ids);
      resource_->enable_p2p();
227
      keys_tensor.resize(resource_->total_device());
D
danleifeng 已提交
228
      device_caches_ = new PSDeviceData[resource_->total_device()];
Y
yaoxuefeng 已提交
229 230 231 232 233 234 235 236 237
#ifdef PADDLE_WITH_GLOO
      auto gloo = paddle::framework::GlooWrapper::GetInstance();
      if (gloo->Size() > 1) {
        multi_node_ = 1;
      }
#else
      PADDLE_THROW(
          platform::errors::Unavailable("heter ps need compile with GLOO"));
#endif
F
Fan Zhang 已提交
238
#ifdef PADDLE_WITH_CUDA
239 240 241 242 243
      if (multi_node_) {
        int dev_size = dev_ids.size();
        // init inner comm
        inner_comms_.resize(dev_size);
        inter_ncclids_.resize(dev_size);
244 245
        platform::dynload::ncclCommInitAll(
            &(inner_comms_[0]), dev_size, &dev_ids[0]);
246 247 248 249 250 251 252 253 254 255
// init inter comm
#ifdef PADDLE_WITH_GLOO
        inter_comms_.resize(dev_size);
        if (gloo->Rank() == 0) {
          for (int i = 0; i < dev_size; ++i) {
            platform::dynload::ncclGetUniqueId(&inter_ncclids_[i]);
          }
        }

        PADDLE_ENFORCE_EQ(
256 257
            gloo->IsInitialized(),
            true,
258 259 260 261 262 263 264 265
            platform::errors::PreconditionNotMet(
                "You must initialize the gloo environment first to use it."));
        gloo::BroadcastOptions opts(gloo->GetContext());
        opts.setOutput(&inter_ncclids_[0], dev_size);
        opts.setRoot(0);
        gloo::broadcast(opts);

        for (int i = 0; i < dev_size; ++i) {
266 267
          platform::dynload::ncclCommInitRank(
              &inter_comms_[i], gloo->Size(), inter_ncclids_[i], gloo->Rank());
268 269 270 271 272 273 274
        }
        node_size_ = gloo->Size();
#else
        PADDLE_THROW(
            platform::errors::Unavailable("heter ps need compile with GLOO"));
#endif
      }
F
Fan Zhang 已提交
275
#endif
Y
yaoxuefeng 已提交
276
      heter_devices_ = dev_ids;
277 278 279 280 281 282 283 284 285 286 287
      data_ready_channel_->Open();
      data_ready_channel_->SetCapacity(3);
      buildcpu_ready_channel_->Open();
      buildcpu_ready_channel_->SetCapacity(3);
      gpu_free_channel_->Open();
      gpu_free_channel_->SetCapacity(1);

      current_task_ = nullptr;
      gpu_free_channel_->Put(current_task_);

      table_id_ = 0;
288

289 290
      // start build cpu&gpu ps thread
      start_build_thread();
T
Thunderbrook 已提交
291 292
    }
  }
Y
yaoxuefeng 已提交
293

294 295 296 297 298 299
  void SetSparseSGD(float nonclk_coeff,
                    float clk_coeff,
                    float min_bound,
                    float max_bound,
                    float learning_rate,
                    float initial_g2sum,
D
danleifeng 已提交
300 301 302 303
                    float initial_range,
                    float beta1_decay_rate,
                    float beta2_decay_rate,
                    float ada_epsilon);
304 305 306 307 308
  void SetEmbedxSGD(float mf_create_thresholds,
                    float mf_learning_rate,
                    float mf_initial_g2sum,
                    float mf_initial_range,
                    float mf_min_bound,
D
danleifeng 已提交
309 310 311
                    float mf_max_bound,
                    float mf_beta1_decay_rate,
                    float mf_beta2_decay_rate,
D
danleifeng 已提交
312 313 314
                    float mf_ada_epsilon,
                    float nodeid_slot,
                    float feature_learning_rate);
D
danleifeng 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365

#ifdef PADDLE_WITH_PSCORE
  void add_sparse_optimizer(
      std::unordered_map<std::string, float>& config,  // NOLINT
      const ::paddle::distributed::SparseCommonSGDRuleParameter& sgd_param,
      const std::string& prefix = "") {
    auto optimizer_name = sgd_param.name();
    if (optimizer_name == "SparseNaiveSGDRule") {
      config[prefix + "optimizer_type"] = 0;
      config[prefix + "learning_rate"] = sgd_param.naive().learning_rate();
      config[prefix + "initial_range"] = sgd_param.naive().initial_range();
      config[prefix + "min_bound"] = sgd_param.naive().weight_bounds()[0];
      config[prefix + "max_bound"] = sgd_param.naive().weight_bounds()[1];
    } else if (optimizer_name == "SparseAdaGradSGDRule") {
      config[prefix + "optimizer_type"] = 1;
      config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
      config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
      config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
      config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0];
      config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
    } else if (optimizer_name == "StdAdaGradSGDRule") {
      config[prefix + "optimizer_type"] = 2;
      config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
      config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
      config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
      config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0];
      config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
    } else if (optimizer_name == "SparseAdamSGDRule") {
      config[prefix + "optimizer_type"] = 3;
      config[prefix + "learning_rate"] = sgd_param.adam().learning_rate();
      config[prefix + "initial_range"] = sgd_param.adam().initial_range();
      config[prefix + "beta1_decay_rate"] = sgd_param.adam().beta1_decay_rate();
      config[prefix + "beta2_decay_rate"] = sgd_param.adam().beta2_decay_rate();
      config[prefix + "ada_epsilon"] = sgd_param.adam().ada_epsilon();
      config[prefix + "min_bound"] = sgd_param.adam().weight_bounds()[0];
      config[prefix + "max_bound"] = sgd_param.adam().weight_bounds()[1];
    } else if (optimizer_name == "SparseSharedAdamSGDRule") {
      config[prefix + "optimizer_type"] = 4;
      config[prefix + "learning_rate"] = sgd_param.adam().learning_rate();
      config[prefix + "initial_range"] = sgd_param.adam().initial_range();
      config[prefix + "beta1_decay_rate"] = sgd_param.adam().beta1_decay_rate();
      config[prefix + "beta2_decay_rate"] = sgd_param.adam().beta2_decay_rate();
      config[prefix + "ada_epsilon"] = sgd_param.adam().ada_epsilon();
      config[prefix + "min_bound"] = sgd_param.adam().weight_bounds()[0];
      config[prefix + "max_bound"] = sgd_param.adam().weight_bounds()[1];
    }
  }

  void InitializeGPUServer(paddle::distributed::PSParameter ps_param) {
    auto sparse_table =
        ps_param.server_param().downpour_server_param().downpour_table_param(0);
D
danleifeng 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
    // set build thread_num and shard_num
    thread_keys_thread_num_ = sparse_table.shard_num();
    thread_keys_shard_num_ = sparse_table.shard_num();
    VLOG(1) << "ps_gpu build phase thread_num:" << thread_keys_thread_num_
            << " shard_num:" << thread_keys_shard_num_;

    pull_thread_pool_.resize(thread_keys_shard_num_);
    for (size_t i = 0; i < pull_thread_pool_.size(); i++) {
      pull_thread_pool_[i].reset(new ::ThreadPool(1));
    }
    hbm_thread_pool_.resize(thread_keys_shard_num_);
    for (size_t i = 0; i < hbm_thread_pool_.size(); i++) {
      hbm_thread_pool_[i].reset(new ::ThreadPool(1));
    }

D
danleifeng 已提交
381 382 383 384 385 386 387 388 389 390 391
    auto sparse_table_accessor = sparse_table.accessor();
    auto sparse_table_accessor_parameter =
        sparse_table_accessor.ctr_accessor_param();
    accessor_class_ = sparse_table_accessor.accessor_class();

    std::unordered_map<std::string, float> config;
    config["embedx_dim"] = sparse_table_accessor.embedx_dim();
    config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff();
    config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff();
    config["mf_create_thresholds"] = sparse_table_accessor.embedx_threshold();

D
danleifeng 已提交
392 393 394 395 396
    config["nodeid_slot"] =
        sparse_table_accessor.graph_sgd_param().nodeid_slot();
    config["feature_learning_rate"] =
        sparse_table_accessor.graph_sgd_param().feature_learning_rate();

D
danleifeng 已提交
397 398 399 400 401 402 403 404
    if (accessor_class_ == "CtrDymfAccessor") {
      // optimizer config for embed_w and embedx
      add_sparse_optimizer(config, sparse_table_accessor.embed_sgd_param());
      add_sparse_optimizer(
          config, sparse_table_accessor.embedx_sgd_param(), "mf_");
    }

    fleet_config_ = config;
D
danleifeng 已提交
405 406
    GlobalAccessorFactory::GetInstance().Init(accessor_class_);
    GlobalAccessorFactory::GetInstance().GetAccessorWrapper()->Configure(
D
danleifeng 已提交
407 408 409 410 411
        config);
    InitializeGPUServer(config);
  }
#endif

Y
yaoxuefeng 已提交
412 413 414 415 416 417 418
  void InitializeGPUServer(std::unordered_map<std::string, float> config) {
    float nonclk_coeff = (config.find("nonclk_coeff") == config.end())
                             ? 1.0
                             : config["nonclk_coeff"];
    float clk_coeff =
        (config.find("clk_coeff") == config.end()) ? 1.0 : config["clk_coeff"];
    float min_bound = (config.find("min_bound") == config.end())
D
danleifeng 已提交
419
                          ? -10.0
Y
yaoxuefeng 已提交
420
                          : config["min_bound"];
D
danleifeng 已提交
421 422
    float max_bound =
        (config.find("max_bound") == config.end()) ? 10.0 : config["max_bound"];
Y
yaoxuefeng 已提交
423
    float learning_rate = (config.find("learning_rate") == config.end())
D
danleifeng 已提交
424
                              ? 0.05
Y
yaoxuefeng 已提交
425 426
                              : config["learning_rate"];
    float initial_g2sum = (config.find("initial_g2sum") == config.end())
D
danleifeng 已提交
427
                              ? 3.0
Y
yaoxuefeng 已提交
428 429
                              : config["initial_g2sum"];
    float initial_range = (config.find("initial_range") == config.end())
D
danleifeng 已提交
430
                              ? 1e-4
Y
yaoxuefeng 已提交
431
                              : config["initial_range"];
D
danleifeng 已提交
432 433 434 435 436 437 438 439 440
    float beta1_decay_rate = (config.find("beta1_decay_rate") == config.end())
                                 ? 0.9
                                 : config["beta1_decay_rate"];
    float beta2_decay_rate = (config.find("beta2_decay_rate") == config.end())
                                 ? 0.999
                                 : config["beta2_decay_rate"];
    float ada_epsilon = (config.find("ada_epsilon") == config.end())
                            ? 1e-8
                            : config["ada_epsilon"];
Y
yaoxuefeng 已提交
441 442 443 444 445 446
    // mf config settings
    float mf_create_thresholds =
        (config.find("mf_create_thresholds") == config.end())
            ? static_cast<float>(1.0)
            : config["mf_create_thresholds"];
    float mf_learning_rate = (config.find("mf_learning_rate") == config.end())
D
danleifeng 已提交
447
                                 ? 0.05
Y
yaoxuefeng 已提交
448 449
                                 : config["mf_learning_rate"];
    float mf_initial_g2sum = (config.find("mf_initial_g2sum") == config.end())
D
danleifeng 已提交
450
                                 ? 3.0
Y
yaoxuefeng 已提交
451 452
                                 : config["mf_initial_g2sum"];
    float mf_initial_range = (config.find("mf_initial_range") == config.end())
D
danleifeng 已提交
453
                                 ? 1e-4
Y
yaoxuefeng 已提交
454 455
                                 : config["mf_initial_range"];
    float mf_min_bound = (config.find("mf_min_bound") == config.end())
D
danleifeng 已提交
456
                             ? -10.0
Y
yaoxuefeng 已提交
457 458
                             : config["mf_min_bound"];
    float mf_max_bound = (config.find("mf_max_bound") == config.end())
D
danleifeng 已提交
459
                             ? 10.0
Y
yaoxuefeng 已提交
460
                             : config["mf_max_bound"];
D
danleifeng 已提交
461 462 463 464 465 466 467 468 469 470 471
    float mf_beta1_decay_rate =
        (config.find("mf_beta1_decay_rate") == config.end())
            ? 0.9
            : config["mf_beta1_decay_rate"];
    float mf_beta2_decay_rate =
        (config.find("mf_beta2_decay_rate") == config.end())
            ? 0.999
            : config["mf_beta2_decay_rate"];
    float mf_ada_epsilon = (config.find("mf_ada_epsilon") == config.end())
                               ? 1e-8
                               : config["mf_ada_epsilon"];
D
danleifeng 已提交
472 473 474 475 476 477 478 479 480 481

    float feature_learning_rate =
        (config.find("feature_learning_rate") == config.end())
            ? 0.05
            : config["feature_learning_rate"];

    float nodeid_slot = (config.find("nodeid_slot") == config.end())
                            ? 9008
                            : config["nodeid_slot"];

482 483 484 485 486 487
    this->SetSparseSGD(nonclk_coeff,
                       clk_coeff,
                       min_bound,
                       max_bound,
                       learning_rate,
                       initial_g2sum,
D
danleifeng 已提交
488 489 490 491
                       initial_range,
                       beta1_decay_rate,
                       beta2_decay_rate,
                       ada_epsilon);
492 493 494 495 496
    this->SetEmbedxSGD(mf_create_thresholds,
                       mf_learning_rate,
                       mf_initial_g2sum,
                       mf_initial_range,
                       mf_min_bound,
D
danleifeng 已提交
497 498 499
                       mf_max_bound,
                       mf_beta1_decay_rate,
                       mf_beta2_decay_rate,
D
danleifeng 已提交
500 501 502
                       mf_ada_epsilon,
                       nodeid_slot,
                       feature_learning_rate);
D
danleifeng 已提交
503 504 505 506

    // set optimizer type(naive,adagrad,std_adagrad,adam,share_adam)
    optimizer_type_ = (config.find("optimizer_type") == config.end())
                          ? 1
Z
zmxdream 已提交
507
                          : static_cast<int>(config["optimizer_type"]);
D
danleifeng 已提交
508 509 510 511

    VLOG(0) << "InitializeGPUServer optimizer_type_:" << optimizer_type_
            << " nodeid_slot:" << nodeid_slot
            << " feature_learning_rate:" << feature_learning_rate;
Y
yaoxuefeng 已提交
512
  }
F
Fan Zhang 已提交
513

514 515 516 517 518 519
  void SetDate(int year, int month, int day) {
    year_ = year;
    month_ = month;
    day_ = day;
  }

Y
yaoxuefeng 已提交
520 521
  void SetDataset(Dataset* dataset) { dataset_ = dataset; }

T
Thunderbrook 已提交
522 523
  // PSGPUWrapper singleton
  static std::shared_ptr<PSGPUWrapper> GetInstance() {
524 525 526 527 528
    {
      std::lock_guard<std::mutex> lk(ins_mutex);
      if (NULL == s_instance_) {
        s_instance_.reset(new paddle::framework::PSGPUWrapper());
      }
T
Thunderbrook 已提交
529 530 531 532 533 534 535 536 537 538 539
    }
    return s_instance_;
  }
  std::vector<std::unordered_map<uint64_t, std::vector<float>>>& GetLocalTable(
      int table_id) {
    return local_tables_[table_id];
  }
  void SetSlotVector(const std::vector<int>& slot_vector) {
    slot_vector_ = slot_vector;
  }

Y
yaoxuefeng 已提交
540 541
  void SetSlotOffsetVector(const std::vector<int>& slot_offset_vector) {
    slot_offset_vector_ = slot_offset_vector;
Y
yaoxuefeng 已提交
542 543 544 545 546
    std::cout << "yxf set: ";
    for (auto s : slot_offset_vector_) {
      std::cout << s << " | ";
    }
    std::cout << " end " << std::endl;
Y
yaoxuefeng 已提交
547 548
  }

F
Fan Zhang 已提交
549
#ifdef PADDLE_WITH_CUDA
Y
yaoxuefeng 已提交
550 551 552
  void SetSlotDimVector(const std::vector<int>& slot_mf_dim_vector) {
    slot_mf_dim_vector_ = slot_mf_dim_vector;
    assert(slot_mf_dim_vector_.size() == slot_vector_.size());
Y
yaoxuefeng 已提交
553 554 555 556 557 558
  }

  void InitSlotInfo() {
    if (slot_info_initialized_) {
      return;
    }
Z
zmxdream 已提交
559
    SlotRecordDataset* dataset = reinterpret_cast<SlotRecordDataset*>(dataset_);
Y
yaoxuefeng 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    auto slots_vec = dataset->GetSlots();
    slot_offset_vector_.clear();
    for (auto& slot : slot_vector_) {
      for (size_t i = 0; i < slots_vec.size(); ++i) {
        if (std::to_string(slot) == slots_vec[i]) {
          slot_offset_vector_.push_back(i);
          break;
        }
      }
    }
    std::cout << "psgpu wrapper use slots: ";
    for (auto s : slot_offset_vector_) {
      std::cout << s << " | ";
    }
    std::cout << " end " << std::endl;
    for (size_t i = 0; i < slot_mf_dim_vector_.size(); i++) {
Y
yaoxuefeng 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
      slot_dim_map_[slot_vector_[i]] = slot_mf_dim_vector_[i];
    }

    std::unordered_set<int> dims_set;
    for (auto& it : slot_dim_map_) {
      dims_set.insert(it.second);
    }
    size_t num_of_dim = dims_set.size();
    index_dim_vec_.resize(num_of_dim);
    index_dim_vec_.assign(dims_set.begin(), dims_set.end());
    std::sort(index_dim_vec_.begin(), index_dim_vec_.end());
    std::unordered_map<int, int> dim_index_map;
    for (size_t i = 0; i < num_of_dim; i++) {
      dim_index_map[index_dim_vec_[i]] = i;
    }
591 592
    hbm_pools_.resize(resource_->total_device() * num_of_dim);
    mem_pools_.resize(resource_->total_device() * num_of_dim);
Y
yaoxuefeng 已提交
593 594 595 596 597 598 599
    max_mf_dim_ = index_dim_vec_.back();
    multi_mf_dim_ = (dim_index_map.size() >= 1) ? dim_index_map.size() : 0;
    resource_->set_multi_mf(multi_mf_dim_, max_mf_dim_);
    slot_index_vec_.resize(slot_mf_dim_vector_.size());
    for (size_t i = 0; i < slot_index_vec_.size(); i++) {
      slot_index_vec_[i] = dim_index_map[slot_mf_dim_vector_[i]];
    }
D
danleifeng 已提交
600 601

    auto accessor_wrapper_ptr =
D
danleifeng 已提交
602
        GlobalAccessorFactory::GetInstance().GetAccessorWrapper();
D
danleifeng 已提交
603 604
    val_type_size_ = accessor_wrapper_ptr->GetFeatureValueSize(max_mf_dim_);
    grad_type_size_ = accessor_wrapper_ptr->GetPushValueSize(max_mf_dim_);
D
danleifeng 已提交
605
    pull_type_size_ = accessor_wrapper_ptr->GetPullValueSize(max_mf_dim_);
D
danleifeng 已提交
606
    VLOG(0) << "InitSlotInfo: val_type_size_" << val_type_size_
D
danleifeng 已提交
607 608
            << " grad_type_size_:" << grad_type_size_
            << " pull_type_size_:" << pull_type_size_;
Y
yaoxuefeng 已提交
609
    slot_info_initialized_ = true;
Y
yaoxuefeng 已提交
610
  }
F
Fan Zhang 已提交
611
#endif
Y
yaoxuefeng 已提交
612

T
Thunderbrook 已提交
613 614
  void ShowOneTable(int index) { HeterPs_->show_one_table(index); }

T
Thunderbrook 已提交
615 616 617 618 619 620 621 622
  int UseAfsApi() { return use_afs_api_; }

#ifdef PADDLE_WITH_PSLIB
  std::shared_ptr<paddle::ps::AfsReader> OpenReader(
      const std::string& filename) {
    return afs_handler_.open_reader(filename);
  }

Z
zmxdream 已提交
623 624 625 626 627
  std::shared_ptr<paddle::ps::AfsWriter> OpenWriter(
      const std::string& filename) {
    return afs_handler_.open_writer(filename);
  }

628 629 630 631
  void InitAfsApi(const std::string& fs_name,
                  const std::string& fs_user,
                  const std::string& pass_wd,
                  const std::string& conf);
T
Thunderbrook 已提交
632 633
#endif

D
danleifeng 已提交
634 635 636 637 638 639
#ifdef PADDLE_WITH_PSCORE
  void SetTableAccessor(paddle::distributed::ValueAccessor* accessor) {
    cpu_table_accessor_ = accessor;
  }
#endif

T
Thunderbrook 已提交
640 641
 private:
  static std::shared_ptr<PSGPUWrapper> s_instance_;
642
  static std::mutex ins_mutex;
Y
yaoxuefeng 已提交
643
  Dataset* dataset_;
T
Thunderbrook 已提交
644 645 646
#ifdef PADDLE_WITH_PSLIB
  paddle::ps::AfsApiWrapper afs_handler_;
#endif
T
Thunderbrook 已提交
647
  std::unordered_map<
648 649
      uint64_t,
      std::vector<std::unordered_map<uint64_t, std::vector<float>>>>
T
Thunderbrook 已提交
650 651 652 653 654 655
      local_tables_;
  HeterPsBase* HeterPs_;
  std::vector<LoDTensor> keys_tensor;  // Cache for pull_sparse
  std::shared_ptr<HeterPsResource> resource_;
  int32_t sleep_seconds_before_fail_exit_;
  std::vector<int> slot_vector_;
Y
yaoxuefeng 已提交
656 657 658 659 660 661 662 663 664
  std::vector<int> slot_offset_vector_;
  std::vector<int> slot_mf_dim_vector_;
  std::unordered_map<int, int> slot_dim_map_;
  std::vector<int> slot_index_vec_;
  std::vector<int> index_dim_vec_;
  int multi_mf_dim_{0};
  int max_mf_dim_{0};
  size_t val_type_size_{0};
  size_t grad_type_size_{0};
D
danleifeng 已提交
665
  size_t pull_type_size_{0};
Y
yaoxuefeng 已提交
666 667 668 669 670 671

  double time_1 = 0.0;
  double time_2 = 0.0;
  double time_3 = 0.0;
  double time_4 = 0.0;

T
Thunderbrook 已提交
672
  int multi_node_{0};
673
  int node_size_;
674
  uint64_t table_id_;
D
danleifeng 已提交
675
  int gpu_graph_mode_ = 0;
F
Fan Zhang 已提交
676
#ifdef PADDLE_WITH_CUDA
677 678 679
  std::vector<ncclComm_t> inner_comms_;
  std::vector<ncclComm_t> inter_comms_;
  std::vector<ncclUniqueId> inter_ncclids_;
F
Fan Zhang 已提交
680
#endif
Y
yaoxuefeng 已提交
681 682 683
  std::vector<int> heter_devices_;
  std::unordered_set<std::string> gpu_ps_config_keys_;
  HeterObjectPool<HeterContext> gpu_task_pool_;
684
  std::vector<std::vector<robin_hood::unordered_set<uint64_t>>> thread_keys_;
685 686
  std::vector<std::vector<std::vector<robin_hood::unordered_set<uint64_t>>>>
      thread_dim_keys_;
Y
yaoxuefeng 已提交
687 688 689
  int thread_keys_thread_num_ = 37;
  int thread_keys_shard_num_ = 37;
  uint64_t max_fea_num_per_pass_ = 5000000000;
690 691 692
  int year_;
  int month_;
  int day_;
Y
yaoxuefeng 已提交
693
  bool slot_info_initialized_ = false;
T
Thunderbrook 已提交
694
  int use_afs_api_ = 0;
D
danleifeng 已提交
695 696 697 698 699 700
  int optimizer_type_ = 1;
  std::string accessor_class_;
  std::unordered_map<std::string, float> fleet_config_;
#ifdef PADDLE_WITH_PSCORE
  paddle::distributed::ValueAccessor* cpu_table_accessor_;
#endif
T
Thunderbrook 已提交
701

F
Fan Zhang 已提交
702
#ifdef PADDLE_WITH_CUDA
Y
yaoxuefeng 已提交
703 704 705
  std::vector<MemoryPool*> mem_pools_;
  std::vector<HBMMemoryPool*> hbm_pools_;  // in multi mfdim, one table need hbm
                                           // pools of totol dims number
F
Fan Zhang 已提交
706
#endif
Y
yaoxuefeng 已提交
707

708 709 710 711 712 713 714 715 716 717 718 719 720
  std::shared_ptr<
      paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
      data_ready_channel_ =
          paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
  std::shared_ptr<
      paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
      buildcpu_ready_channel_ =
          paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
  std::shared_ptr<
      paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
      gpu_free_channel_ =
          paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
  std::shared_ptr<HeterContext> current_task_ = nullptr;
721
  std::thread pre_build_threads_;
722
  bool running_ = false;
Y
yaoxuefeng 已提交
723
  std::vector<std::shared_ptr<ThreadPool>> pull_thread_pool_;
T
Thunderbrook 已提交
724
  std::vector<std::shared_ptr<ThreadPool>> hbm_thread_pool_;
D
danleifeng 已提交
725
  OptimizerConfig optimizer_config_;
726

T
Thunderbrook 已提交
727 728 729 730 731 732 733
 protected:
  static bool is_initialized_;
};

}  // end namespace framework
}  // end namespace paddle
#endif