heter_comm.h 8.8 KB
Newer Older
T
Thunderbrook 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
T
Thunderbrook 已提交
16
#include <thread>
T
Thunderbrook 已提交
17
#include <vector>
Y
yaoxuefeng 已提交
18 19
#include "cub/cub.cuh"
#include "cub/util_allocator.cuh"
20
#if defined(PADDLE_WITH_CUDA)
T
Thunderbrook 已提交
21
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
T
Thunderbrook 已提交
22
#include "paddle/fluid/platform/cuda_device_guard.h"
23
#include "paddle/fluid/platform/dynload/nccl.h"
Y
yaoxuefeng 已提交
24
#include "paddle/fluid/platform/timer.h"
T
Thunderbrook 已提交
25
#include "thrust/pair.h"
26
#elif defined(PADDLE_WITH_XPU_KP)
27
// #include "paddle/fluid/framework/fleet/heter_ps/optimizer_conf.h"
28 29 30 31 32 33 34 35 36 37
#include <xpu/runtime.h>
#include "paddle/fluid/platform/device/xpu/enforce_xpu.h"
#endif

#include "paddle/fluid/framework/fleet/heter_ps/hashtable.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/memory/memory.h"
#include "paddle/fluid/platform/place.h"
T
Thunderbrook 已提交
38

T
Thunderbrook 已提交
39
#ifdef PADDLE_WITH_HETERPS
T
Thunderbrook 已提交
40 41 42 43

namespace paddle {
namespace framework {

Y
yaoxuefeng 已提交
44 45 46
#define TYPEALIGN(ALIGNVAL, LEN) \
  (((uint64_t)(LEN) + ((ALIGNVAL)-1)) & ~((uint64_t)((ALIGNVAL)-1)))

T
Thunderbrook 已提交
47 48 49 50 51 52 53 54 55 56 57
template <typename KeyType, typename ValType, typename GradType>
class HeterComm {
 public:
  HeterComm(size_t capacity, std::shared_ptr<HeterPsResource> resource);
  virtual ~HeterComm();
  HeterComm(const HeterComm&) = delete;
  HeterComm& operator=(const HeterComm&) = delete;

  void split_input_to_shard(KeyType* d_keys, int* d_idx_ptr, size_t len,
                            int* left, int* right, int gpu_num);
  void merge_grad(int gpu_num, KeyType* d_keys, GradType* d_grads, size_t len,
58
                  int& uniq_len);  // NOLINT
Y
yaoxuefeng 已提交
59 60
  void dynamic_merge_grad(int gpu_num, KeyType* d_keys, GradType* d_grads,
                          size_t len, int& uniq_len);
T
Thunderbrook 已提交
61 62 63
  void pull_sparse(int num, KeyType* d_keys, ValType* d_vals, size_t len);
  void build_ps(int num, KeyType* h_keys, ValType* h_vals, size_t len,
                size_t chunk_size, int stream_num);
Y
yaoxuefeng 已提交
64 65
  void build_ps(int num, KeyType* h_keys, char* pool, size_t len,
                size_t feature_value_size, size_t chunk_size, int stream_num);
T
Thunderbrook 已提交
66 67 68 69
  void dump();
  void show_one_table(int gpu_num);
  int get_index_by_devid(int devid);

70
#if defined(PADDLE_WITH_CUDA)
T
Thunderbrook 已提交
71 72
  template <typename Sgd>
  void push_sparse(int num, KeyType* d_keys, GradType* d_grads, size_t len,
73
                   Sgd& sgd);  // NOLINT
74 75 76 77
#elif defined(PADDLE_WITH_XPU_KP)
  void push_sparse(int num, KeyType* d_keys, GradType* d_grads, size_t len);
#endif

78 79 80
  void set_sparse_sgd(const OptimizerConfig& optimizer_config);
  void set_embedx_sgd(const OptimizerConfig& optimizer_config);

81
  int log2i(int x);
T
Thunderbrook 已提交
82

83 84 85 86 87
  template <typename DstPlace, typename SrcPlace, typename StreamType>
  void memory_copy(DstPlace dst_place, void* dst, SrcPlace src_place,
                   const void* src, size_t count, StreamType stream = 0);

#if defined(PADDLE_WITH_CUDA)
88 89
  template <typename Sgd>
  void push_sparse_multi_node(int num, KeyType* d_keys, GradType* d_grads,
90
                              size_t len, Sgd& sgd);  // NOLINT
91 92 93

  template <typename Sgd>
  void update_one_table(int num, KeyType* d_keys, GradType* d_grads, size_t len,
94
                        Sgd& sgd);  // NOLINT
95 96 97 98 99 100 101 102 103 104 105 106 107 108

  int gather_one_node_grad(int num, KeyType* d_keys, GradType* d_grads,
                           int len);

  int gather_multi_node_grad(int num, KeyType* d_keys, GradType* d_grads,
                             int len);

  void set_nccl_comm_and_size(const std::vector<ncclComm_t>& inner_comms,
                              const std::vector<ncclComm_t>& inter_comms,
                              int comm_size) {
    nccl_inner_comms_ = inner_comms;
    nccl_inter_comms_ = inter_comms;
    node_size_ = comm_size;
  }
Y
yaoxuefeng 已提交
109 110 111 112 113

  void set_multi_mf_dim(int multi_mf_dim, int max_mf_dim) {
    multi_mf_dim_ = multi_mf_dim;
    max_mf_dim_ = max_mf_dim;
  }
114
#endif
115

116 117 118 119
  bool need_transfer(int send_id, int receive_id) {
    return ((send_id / 4 != receive_id / 4) && (send_id + 4) % 8 != receive_id);
  }

T
Thunderbrook 已提交
120 121
  // void dump_to_cpu(int index);

122 123
  int get_transfer_devid(int send_id) { return (send_id + 4) % 8; }

124 125
  void end_pass();

126
  struct Node {
127 128
    ppStream in_stream;
    ppStream out_stream;
129 130 131
    char* key_storage;
    char* val_storage;
    int sync;
Y
yaoxuefeng 已提交
132 133
    size_t key_bytes_len;
    size_t val_bytes_len;
134
    int dev_num;
135 136 137 138 139 140
  };

  struct Path {
    std::vector<Node> nodes_;
  };

141 142 143 144 145 146
  struct CopyTask {
    Path* path;
    int step;
    CopyTask(Path* path_, int step_) : path(path_), step(step_) {}
  };

147 148 149 150 151 152 153
  struct LocalStorage {
    LocalStorage() {}
    void init(int size, int dev_id) {
      place_ = platform::CUDAPlace(dev_id);
      alloc(size, true);
    }

154
    void alloc(size_t size, bool force = false) {
155 156 157
      if (force || size > all_keys_mem->size()) {
        all_keys_mem.reset();
        all_grads_mem.reset();
158 159
        all_keys_mem = memory::Alloc(place_, size * sizeof(KeyType));
        all_grads_mem = memory::Alloc(place_, size * sizeof(GradType));
160 161 162 163 164 165
        all_keys = reinterpret_cast<KeyType*>(all_keys_mem->ptr());
        all_grads = reinterpret_cast<GradType*>(all_grads_mem->ptr());
      }
      if (force || size > local_keys_mem->size()) {
        local_keys_mem.reset();
        local_grads_mem.reset();
166 167
        local_keys_mem = memory::Alloc(place_, size * sizeof(KeyType));
        local_grads_mem = memory::Alloc(place_, size * sizeof(GradType));
168 169 170 171 172
        local_keys = reinterpret_cast<KeyType*>(local_keys_mem->ptr());
        local_grads = reinterpret_cast<GradType*>(local_grads_mem->ptr());
      }
    }

173
#if defined(PADDLE_WITH_CUDA)
174
    platform::CUDAPlace place_;
F
Fan Zhang 已提交
175

176 177 178
#elif defined(PADDLE_WITH_XPU_KP)
    platform::XPUPlace place_;
#endif
179 180
    std::shared_ptr<memory::Allocation> all_keys_mem;
    std::shared_ptr<memory::Allocation> all_grads_mem;
F
Fan Zhang 已提交
181

182 183 184 185 186 187 188 189 190
    KeyType* all_keys;
    GradType* all_grads;

    std::shared_ptr<memory::Allocation> local_keys_mem;
    std::shared_ptr<memory::Allocation> local_grads_mem;
    KeyType* local_keys;
    GradType* local_grads;
  };

191
  void init_path();
T
Thunderbrook 已提交
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
  template <typename StreamType>
  void sync_stream(const StreamType& stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamSynchronize(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_wait(stream));
#endif
  }

  template <typename StreamType>
  void create_stream(StreamType* stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamCreate(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(stream));
#endif
  }

  template <typename StreamType>
  void destroy_stream(StreamType stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamDestroy(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_destroy(stream));
#endif
  }

T
Thunderbrook 已提交
220 221
  void create_storage(int start_index, int end_index, int keylen, int vallen);
  void destroy_storage(int start_index, int end_index);
222 223
  void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right,
                    KeyType* src_key, GradType* src_val);
Y
yaoxuefeng 已提交
224 225
  void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right,
                    KeyType* src_key, char* src_val, size_t val_size);
226 227
  void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right,
                   ValType* src_val);
Y
yaoxuefeng 已提交
228 229
  void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right,
                   char* src_val, size_t val_size);
T
Thunderbrook 已提交
230

S
seemingwang 已提交
231
 protected:
T
Thunderbrook 已提交
232
  using Table = HashTable<KeyType, ValType>;
Y
yaoxuefeng 已提交
233
  using PtrTable = HashTable<KeyType, ValType*>;
T
Thunderbrook 已提交
234
  std::vector<Table*> tables_;
Y
yaoxuefeng 已提交
235
  std::vector<PtrTable*> ptr_tables_;
T
Thunderbrook 已提交
236
  std::shared_ptr<HeterPsResource> resource_;
237
  std::vector<std::vector<Path>> path_;
S
seemingwang 已提交
238 239
  float load_factor_{0.75};
  int block_size_{256};
S
seemingwang 已提交
240
  std::unique_ptr<HeterCommKernel> heter_comm_kernel_;
S
seemingwang 已提交
241 242

 private:
S
seemingwang 已提交
243
  int topo_aware_{0};
244
  std::vector<LocalStorage> storage_;
Y
yaoxuefeng 已提交
245
  DynamicGradMerger merger_;
246
  int feanum_{1800 * 2048};
T
Thunderbrook 已提交
247
  int multi_node_{0};
248 249 250
  int node_size_;

#if defined(PADDLE_WITH_CUDA)
251 252
  std::vector<ncclComm_t> nccl_inner_comms_;
  std::vector<ncclComm_t> nccl_inter_comms_;
Y
yaoxuefeng 已提交
253 254
  int multi_mf_dim_{8};
  int max_mf_dim_ = 8;
T
Thunderbrook 已提交
255
  std::vector<std::shared_ptr<cub::CachingDeviceAllocator>> allocators_;
256
#endif
T
Thunderbrook 已提交
257 258 259 260
};

}  // end namespace framework
}  // end namespace paddle
F
Fan Zhang 已提交
261

T
Thunderbrook 已提交
262
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h"
F
Fan Zhang 已提交
263

T
Thunderbrook 已提交
264
#endif