heter_comm.h 8.8 KB
Newer Older
T
Thunderbrook 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
T
Thunderbrook 已提交
16
#include <thread>
T
Thunderbrook 已提交
17
#include <vector>
18

Y
yaoxuefeng 已提交
19 20
#include "cub/cub.cuh"
#include "cub/util_allocator.cuh"
21
#if defined(PADDLE_WITH_CUDA)
T
Thunderbrook 已提交
22
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
T
Thunderbrook 已提交
23
#include "paddle/fluid/platform/cuda_device_guard.h"
24
#include "paddle/fluid/platform/dynload/nccl.h"
Y
yaoxuefeng 已提交
25
#include "paddle/fluid/platform/timer.h"
T
Thunderbrook 已提交
26
#include "thrust/pair.h"
27
#elif defined(PADDLE_WITH_XPU_KP)
28
// #include "paddle/fluid/framework/fleet/heter_ps/optimizer_conf.h"
29
#include <xpu/runtime.h>
30

31 32 33 34 35 36 37 38 39
#include "paddle/fluid/platform/device/xpu/enforce_xpu.h"
#endif

#include "paddle/fluid/framework/fleet/heter_ps/hashtable.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/memory/memory.h"
#include "paddle/fluid/platform/place.h"
T
Thunderbrook 已提交
40

T
Thunderbrook 已提交
41
#ifdef PADDLE_WITH_HETERPS
T
Thunderbrook 已提交
42 43 44 45

namespace paddle {
namespace framework {

Y
yaoxuefeng 已提交
46 47 48
#define TYPEALIGN(ALIGNVAL, LEN) \
  (((uint64_t)(LEN) + ((ALIGNVAL)-1)) & ~((uint64_t)((ALIGNVAL)-1)))

T
Thunderbrook 已提交
49 50 51 52 53 54 55 56 57 58 59
template <typename KeyType, typename ValType, typename GradType>
class HeterComm {
 public:
  HeterComm(size_t capacity, std::shared_ptr<HeterPsResource> resource);
  virtual ~HeterComm();
  HeterComm(const HeterComm&) = delete;
  HeterComm& operator=(const HeterComm&) = delete;

  void split_input_to_shard(KeyType* d_keys, int* d_idx_ptr, size_t len,
                            int* left, int* right, int gpu_num);
  void merge_grad(int gpu_num, KeyType* d_keys, GradType* d_grads, size_t len,
60
                  int& uniq_len);  // NOLINT
Y
yaoxuefeng 已提交
61 62
  void dynamic_merge_grad(int gpu_num, KeyType* d_keys, GradType* d_grads,
                          size_t len, int& uniq_len);
T
Thunderbrook 已提交
63 64 65
  void pull_sparse(int num, KeyType* d_keys, ValType* d_vals, size_t len);
  void build_ps(int num, KeyType* h_keys, ValType* h_vals, size_t len,
                size_t chunk_size, int stream_num);
Y
yaoxuefeng 已提交
66 67
  void build_ps(int num, KeyType* h_keys, char* pool, size_t len,
                size_t feature_value_size, size_t chunk_size, int stream_num);
T
Thunderbrook 已提交
68 69 70 71
  void dump();
  void show_one_table(int gpu_num);
  int get_index_by_devid(int devid);

72
#if defined(PADDLE_WITH_CUDA)
T
Thunderbrook 已提交
73 74
  template <typename Sgd>
  void push_sparse(int num, KeyType* d_keys, GradType* d_grads, size_t len,
75
                   Sgd& sgd);  // NOLINT
76 77 78 79
#elif defined(PADDLE_WITH_XPU_KP)
  void push_sparse(int num, KeyType* d_keys, GradType* d_grads, size_t len);
#endif

80 81 82
  void set_sparse_sgd(const OptimizerConfig& optimizer_config);
  void set_embedx_sgd(const OptimizerConfig& optimizer_config);

83
  int log2i(int x);
T
Thunderbrook 已提交
84

85 86 87 88 89
  template <typename DstPlace, typename SrcPlace, typename StreamType>
  void memory_copy(DstPlace dst_place, void* dst, SrcPlace src_place,
                   const void* src, size_t count, StreamType stream = 0);

#if defined(PADDLE_WITH_CUDA)
90 91
  template <typename Sgd>
  void push_sparse_multi_node(int num, KeyType* d_keys, GradType* d_grads,
92
                              size_t len, Sgd& sgd);  // NOLINT
93 94 95

  template <typename Sgd>
  void update_one_table(int num, KeyType* d_keys, GradType* d_grads, size_t len,
96
                        Sgd& sgd);  // NOLINT
97 98 99 100 101 102 103 104 105 106 107 108 109 110

  int gather_one_node_grad(int num, KeyType* d_keys, GradType* d_grads,
                           int len);

  int gather_multi_node_grad(int num, KeyType* d_keys, GradType* d_grads,
                             int len);

  void set_nccl_comm_and_size(const std::vector<ncclComm_t>& inner_comms,
                              const std::vector<ncclComm_t>& inter_comms,
                              int comm_size) {
    nccl_inner_comms_ = inner_comms;
    nccl_inter_comms_ = inter_comms;
    node_size_ = comm_size;
  }
Y
yaoxuefeng 已提交
111 112 113 114 115

  void set_multi_mf_dim(int multi_mf_dim, int max_mf_dim) {
    multi_mf_dim_ = multi_mf_dim;
    max_mf_dim_ = max_mf_dim;
  }
116
#endif
117

118 119 120 121
  bool need_transfer(int send_id, int receive_id) {
    return ((send_id / 4 != receive_id / 4) && (send_id + 4) % 8 != receive_id);
  }

T
Thunderbrook 已提交
122 123
  // void dump_to_cpu(int index);

124 125
  int get_transfer_devid(int send_id) { return (send_id + 4) % 8; }

126 127
  void end_pass();

128
  struct Node {
129 130
    ppStream in_stream;
    ppStream out_stream;
131 132 133
    char* key_storage;
    char* val_storage;
    int sync;
Y
yaoxuefeng 已提交
134 135
    size_t key_bytes_len;
    size_t val_bytes_len;
136
    int dev_num;
137 138 139 140 141 142
  };

  struct Path {
    std::vector<Node> nodes_;
  };

143 144 145 146 147 148
  struct CopyTask {
    Path* path;
    int step;
    CopyTask(Path* path_, int step_) : path(path_), step(step_) {}
  };

149 150 151 152 153 154 155
  struct LocalStorage {
    LocalStorage() {}
    void init(int size, int dev_id) {
      place_ = platform::CUDAPlace(dev_id);
      alloc(size, true);
    }

156
    void alloc(size_t size, bool force = false) {
157 158 159
      if (force || size > all_keys_mem->size()) {
        all_keys_mem.reset();
        all_grads_mem.reset();
160 161
        all_keys_mem = memory::Alloc(place_, size * sizeof(KeyType));
        all_grads_mem = memory::Alloc(place_, size * sizeof(GradType));
162 163 164 165 166 167
        all_keys = reinterpret_cast<KeyType*>(all_keys_mem->ptr());
        all_grads = reinterpret_cast<GradType*>(all_grads_mem->ptr());
      }
      if (force || size > local_keys_mem->size()) {
        local_keys_mem.reset();
        local_grads_mem.reset();
168 169
        local_keys_mem = memory::Alloc(place_, size * sizeof(KeyType));
        local_grads_mem = memory::Alloc(place_, size * sizeof(GradType));
170 171 172 173 174
        local_keys = reinterpret_cast<KeyType*>(local_keys_mem->ptr());
        local_grads = reinterpret_cast<GradType*>(local_grads_mem->ptr());
      }
    }

175
#if defined(PADDLE_WITH_CUDA)
176
    platform::CUDAPlace place_;
F
Fan Zhang 已提交
177

178 179 180
#elif defined(PADDLE_WITH_XPU_KP)
    platform::XPUPlace place_;
#endif
181 182
    std::shared_ptr<memory::Allocation> all_keys_mem;
    std::shared_ptr<memory::Allocation> all_grads_mem;
F
Fan Zhang 已提交
183

184 185 186 187 188 189 190 191 192
    KeyType* all_keys;
    GradType* all_grads;

    std::shared_ptr<memory::Allocation> local_keys_mem;
    std::shared_ptr<memory::Allocation> local_grads_mem;
    KeyType* local_keys;
    GradType* local_grads;
  };

193
  void init_path();
T
Thunderbrook 已提交
194

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
  template <typename StreamType>
  void sync_stream(const StreamType& stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamSynchronize(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_wait(stream));
#endif
  }

  template <typename StreamType>
  void create_stream(StreamType* stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamCreate(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(stream));
#endif
  }

  template <typename StreamType>
  void destroy_stream(StreamType stream) {
#if defined(PADDLE_WITH_CUDA)
    PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamDestroy(stream));
#elif defined(PADDLE_WITH_XPU_KP)
    PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_destroy(stream));
#endif
  }

T
Thunderbrook 已提交
222 223
  void create_storage(int start_index, int end_index, int keylen, int vallen);
  void destroy_storage(int start_index, int end_index);
224 225
  void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right,
                    KeyType* src_key, GradType* src_val);
Y
yaoxuefeng 已提交
226 227
  void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right,
                    KeyType* src_key, char* src_val, size_t val_size);
228 229
  void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right,
                   ValType* src_val);
Y
yaoxuefeng 已提交
230 231
  void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right,
                   char* src_val, size_t val_size);
T
Thunderbrook 已提交
232

S
seemingwang 已提交
233
 protected:
T
Thunderbrook 已提交
234
  using Table = HashTable<KeyType, ValType>;
Y
yaoxuefeng 已提交
235
  using PtrTable = HashTable<KeyType, ValType*>;
T
Thunderbrook 已提交
236
  std::vector<Table*> tables_;
Y
yaoxuefeng 已提交
237
  std::vector<PtrTable*> ptr_tables_;
T
Thunderbrook 已提交
238
  std::shared_ptr<HeterPsResource> resource_;
239
  std::vector<std::vector<Path>> path_;
S
seemingwang 已提交
240 241
  float load_factor_{0.75};
  int block_size_{256};
S
seemingwang 已提交
242
  std::unique_ptr<HeterCommKernel> heter_comm_kernel_;
S
seemingwang 已提交
243 244

 private:
S
seemingwang 已提交
245
  int topo_aware_{0};
246
  std::vector<LocalStorage> storage_;
Y
yaoxuefeng 已提交
247
  DynamicGradMerger merger_;
248
  int feanum_{1800 * 2048};
T
Thunderbrook 已提交
249
  int multi_node_{0};
250 251 252
  int node_size_;

#if defined(PADDLE_WITH_CUDA)
253 254
  std::vector<ncclComm_t> nccl_inner_comms_;
  std::vector<ncclComm_t> nccl_inter_comms_;
Y
yaoxuefeng 已提交
255 256
  int multi_mf_dim_{8};
  int max_mf_dim_ = 8;
T
Thunderbrook 已提交
257
  std::vector<std::shared_ptr<cub::CachingDeviceAllocator>> allocators_;
258
#endif
T
Thunderbrook 已提交
259 260 261 262
};

}  // end namespace framework
}  // end namespace paddle
F
Fan Zhang 已提交
263

T
Thunderbrook 已提交
264
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h"
F
Fan Zhang 已提交
265

T
Thunderbrook 已提交
266
#endif