ProcessGroupHeter.cc 15.0 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
16

17
#include <chrono>
18

L
lilong12 已提交
19 20 21 22 23 24 25 26 27 28 29
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;
30 31
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
L
lilong12 已提交
32 33

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
34
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
35 36
  return std::make_shared<ProcessGroupHeter::HeterTask>(
      rank, comm_type, inputs);
L
lilong12 已提交
37 38
}

39 40
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
41 42 43 44 45 46 47 48 49 50 51
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

52 53 54 55 56 57 58 59 60 61 62 63 64
ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
                                     int rank,
                                     int size,
                                     const platform::Place& place,
                                     int gid,
                                     int local_rank,
                                     int local_size,
                                     int gloo_rank,
                                     int gloo_size,
                                     bool with_switch,
                                     std::string switch_endpoint,
                                     int src_rank,
                                     int dst_rank)
65
    : ProcessGroup(rank, size, place, gid),
L
lilong12 已提交
66 67 68 69 70
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
71
      with_switch_(with_switch),
72 73 74 75
      switch_endpoint_(switch_endpoint),
      src_rank_(src_rank),
      dst_rank_(dst_rank) {
  return;
76 77 78 79 80 81
#ifdef PADDLE_WITH_CUSTOM
  if (paddle::platform::is_custom_place(place_)) {
    inner_pg_ = std::make_shared<ProcessGroupCustom>(
        store, local_rank, local_size, place_, IGNORE_ID);
  } else {
#endif
82
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
83 84
    inner_pg_ = std::make_shared<ProcessGroupNCCL>(
        store, local_rank, local_size, place_, IGNORE_ID);
L
lilong12 已提交
85
#elif defined(PADDLE_WITH_ASCEND_CL)
86 87
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(
      store, local_rank, local_size, place_, IGNORE_ID);
L
lilong12 已提交
88
#else
89 90
  PADDLE_THROW(platform::errors::Unavailable(
      "ProcessGroupHeter only supports NCCL, RCCL and HCCL now."));
L
lilong12 已提交
91
#endif
92 93 94 95
#ifdef PADDLE_WITH_CUSTOM
  }
#endif

96
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
97 98
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
99 100
    inter_pg_ = std::make_shared<ProcessGroupGloo>(
        store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
L
lilong12 已提交
101 102 103
  }
}

104 105 106 107 108 109 110 111 112
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
113
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
114
    std::vector<phi::DenseTensor>& in_tensors,
115 116
    std::vector<phi::DenseTensor>& out_tensors,
    const AllreduceOptions& opts) {
117
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
L
lilong12 已提交
118
  PADDLE_ENFORCE_EQ(
119 120
      CheckTensorsInCudaPlace(in_tensors),
      true,
L
lilong12 已提交
121
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
122
  PADDLE_ENFORCE_EQ(
123 124
      CheckTensorsInCudaPlace(out_tensors),
      true,
125
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
126 127 128
#endif

  // Step1: do allreduce in inner cluster
129
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
130 131 132 133
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
134 135
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
136
    phi::DenseTensor cpu_tensor;
137 138 139 140
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
141
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
142 143 144
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
145 146 147
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
148
        auto dense_cpu_tensor = cpu_tensors[0];
149
        std::vector<int64_t> send_size;
150
        send_size.push_back(dense_cpu_tensor.numel());
151
        int ret = client_->Send(
152 153 154 155
            gid_,
            {dense_cpu_tensor.name()},
            send_size,
            dense_cpu_tensor.data(),
156 157
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
158 159
        PADDLE_ENFORCE_EQ(ret,
                          0,
160 161
                          platform::errors::PreconditionNotMet(
                              "Send to the switch module error."));
L
lilong12 已提交
162 163 164 165 166
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
167 168
            dense_cpu_tensor.dtype(),
            dense_cpu_tensor.numel());
169
        ret = client_->Recv(
170 171 172
            gid_,
            {dense_cpu_tensor.name()},
            cpu_tensor2.data(),
L
lilong12 已提交
173
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
174 175
        PADDLE_ENFORCE_EQ(ret,
                          0,
176 177
                          platform::errors::PreconditionNotMet(
                              "Recv from the switch module error."));
178

179
        switch (dense_cpu_tensor.dtype()) {
180
          case DataType::FLOAT32:
181
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
182
                           reinterpret_cast<float*>(cpu_tensor2.data()),
183
                           dense_cpu_tensor.numel());
184 185
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
186 187 188
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
189 190
            break;
          case DataType::INT32:
191
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
192
                         reinterpret_cast<int*>(cpu_tensor2.data()),
193
                         dense_cpu_tensor.numel());
194 195 196 197
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
198
                framework::DataType2String(dense_cpu_tensor.dtype())));
199 200
        }
      }
L
lilong12 已提交
201
    } else {
202
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
203 204 205 206
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
207 208 209 210
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
211 212 213 214 215
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
216 217
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
218
  broadcast_task->Wait();
219
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
220 221 222
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
223
    std::vector<phi::DenseTensor>& in_tensors,
224 225
    std::vector<phi::DenseTensor>& out_tensors,
    const BroadcastOptions& opts) {
226
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
L
lilong12 已提交
227
  PADDLE_ENFORCE_EQ(
228 229
      CheckTensorsInCudaPlace(in_tensors),
      true,
L
lilong12 已提交
230
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
231
  PADDLE_ENFORCE_EQ(
232 233
      CheckTensorsInCudaPlace(out_tensors),
      true,
234
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
235 236 237 238
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
239 240
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
241 242

  if (local_rank_ == 0) {
243 244 245 246
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
247
      phi::DenseTensor cpu_tensor;
248 249
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
250
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
251 252
    }
    if (with_switch_) {
253 254 255
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
256
        auto dense_cpu_tensor = cpu_tensors[0];
257
        if (gloo_rank_ == 0) {
258
          std::vector<int64_t> send_size;
259
          send_size.push_back(dense_cpu_tensor.numel());
260
          int ret = client_->Send(
261 262 263
              gid_,
              {dense_cpu_tensor.name()},
              send_size,
264 265 266
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
267 268
          PADDLE_ENFORCE_EQ(ret,
                            0,
269 270
                            platform::errors::PreconditionNotMet(
                                "Send to the switch module error."));
271 272
        } else {
          int ret = client_->Recv(
273 274 275
              gid_,
              {dense_cpu_tensor.name()},
              dense_cpu_tensor.data(),
276 277
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
278 279
          PADDLE_ENFORCE_EQ(ret,
                            0,
280 281 282 283
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
284
    } else {
285
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
286 287
      gloo_task->Wait();
    }
288 289 290 291
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
292 293
    }
  }
294
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
295
  broadcast_task->Wait();
296
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
297 298
}

299 300 301
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
    std::vector<phi::DenseTensor>& in_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
302 303
      in_tensors.size(),
      1,
304 305 306 307 308 309 310
      platform::errors::PreconditionNotMet(
          "For each send operation, there can only be one tensor to send."));
  // Copy Tensor to cpu
  auto start = std::chrono::high_resolution_clock::now();
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = in_tensors[0];
  framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
311 312
  PADDLE_ENFORCE_EQ(with_switch_,
                    true,
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
          << ") from gpu to cpu for send " << std::setw(9)
          << " is: " << diff.count() << " s" << std::endl;

  // Send to switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  int64_t tensor_size =
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
  std::vector<int64_t> send_size;
  send_size.push_back(tensor_size);
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(send_count++);
  VLOG(2) << "tensor_name:" << tensor_name;
332 333
  int ret = client_->Send(
      gid_, {tensor_name}, send_size, cpu_tensor.data(), tensor_size);
334
  PADDLE_ENFORCE_EQ(
335 336
      ret,
      0,
337
      platform::errors::PreconditionNotMet("Send to the switch module error."));
338 339 340 341 342 343
  return CreateTask(rank_, CommType::SEND, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
    std::vector<phi::DenseTensor>& out_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
344 345
      out_tensors.size(),
      1,
346 347 348 349 350 351 352 353 354 355
      platform::errors::PreconditionNotMet(
          "For each rece operation, there can only be one tensor to receive."));

  // Copy Tensor to cpu
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = out_tensors[0];
  cpu_tensor.Resize(gpu_tensor.dims());
  cpu_tensor.set_layout(gpu_tensor.layout());
  cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());

356 357
  PADDLE_ENFORCE_EQ(with_switch_,
                    true,
358 359 360 361 362 363 364 365 366 367 368
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  // recv from switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(recv_count++);
  VLOG(2) << "tensor_name: " << tensor_name;
  auto start = std::chrono::high_resolution_clock::now();
  int ret = client_->Recv(
369 370 371
      gid_,
      {tensor_name},
      cpu_tensor.data(),
372
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
373 374
  PADDLE_ENFORCE_EQ(ret,
                    0,
375 376
                    platform::errors::PreconditionNotMet(
                        "receive to the switch module error."));
377 378 379 380 381 382 383 384 385 386
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  double goodput = cpu_tensor.numel() *
                   framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
  VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
  start = std::chrono::high_resolution_clock::now();
  framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
  end = std::chrono::high_resolution_clock::now();
  diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
387
          << ") from cpu to gpu for recv " << std::setw(9)
388 389 390 391
          << " is: " << diff.count() << " s" << std::endl;
  return CreateTask(rank_, CommType::RECV, out_tensors);
}

392 393
}  // namespace distributed
}  // namespace paddle