ProcessGroupHeter.cc 14.6 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
16

17
#include <chrono>
18

L
lilong12 已提交
19 20 21 22 23 24 25 26 27 28 29
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;
30 31
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
L
lilong12 已提交
32 33

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
34
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
35 36
  return std::make_shared<ProcessGroupHeter::HeterTask>(
      rank, comm_type, inputs);
L
lilong12 已提交
37 38
}

39 40
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
41 42 43 44 45 46 47 48 49 50 51
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

52 53 54 55 56 57 58 59 60 61 62 63 64
ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
                                     int rank,
                                     int size,
                                     const platform::Place& place,
                                     int gid,
                                     int local_rank,
                                     int local_size,
                                     int gloo_rank,
                                     int gloo_size,
                                     bool with_switch,
                                     std::string switch_endpoint,
                                     int src_rank,
                                     int dst_rank)
65
    : ProcessGroup(rank, size, place, gid),
L
lilong12 已提交
66 67 68 69 70
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
71
      with_switch_(with_switch),
72 73 74 75
      switch_endpoint_(switch_endpoint),
      src_rank_(src_rank),
      dst_rank_(dst_rank) {
  return;
L
lilong12 已提交
76
#if defined(PADDLE_WITH_NCCL)
77 78
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(
      store, local_rank, local_size, place_, IGNORE_ID);
L
lilong12 已提交
79
#elif defined(PADDLE_WITH_ASCEND_CL)
80 81
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(
      store, local_rank, local_size, place_, IGNORE_ID);
L
lilong12 已提交
82
#else
83
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
84 85
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
86
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
87 88
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
89 90
    inter_pg_ = std::make_shared<ProcessGroupGloo>(
        store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
L
lilong12 已提交
91 92 93
  }
}

94 95 96 97 98 99 100 101 102
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
103
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
104
    std::vector<phi::DenseTensor>& in_tensors,
105 106
    std::vector<phi::DenseTensor>& out_tensors,
    const AllreduceOptions& opts) {
L
lilong12 已提交
107 108
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
109 110
      CheckTensorsInCudaPlace(in_tensors),
      true,
L
lilong12 已提交
111
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
112
  PADDLE_ENFORCE_EQ(
113 114
      CheckTensorsInCudaPlace(out_tensors),
      true,
115
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
116 117 118
#endif

  // Step1: do allreduce in inner cluster
119
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
120 121 122 123
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
124 125
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
126
    phi::DenseTensor cpu_tensor;
127 128 129 130
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
131
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
132 133 134
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
135 136 137
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
138
        auto dense_cpu_tensor = cpu_tensors[0];
139
        std::vector<int64_t> send_size;
140
        send_size.push_back(dense_cpu_tensor.numel());
141
        int ret = client_->Send(
142 143 144 145
            gid_,
            {dense_cpu_tensor.name()},
            send_size,
            dense_cpu_tensor.data(),
146 147
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
148 149
        PADDLE_ENFORCE_EQ(ret,
                          0,
150 151
                          platform::errors::PreconditionNotMet(
                              "Send to the switch module error."));
L
lilong12 已提交
152 153 154 155 156
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
157 158
            dense_cpu_tensor.dtype(),
            dense_cpu_tensor.numel());
159
        ret = client_->Recv(
160 161 162
            gid_,
            {dense_cpu_tensor.name()},
            cpu_tensor2.data(),
L
lilong12 已提交
163
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
164 165
        PADDLE_ENFORCE_EQ(ret,
                          0,
166 167
                          platform::errors::PreconditionNotMet(
                              "Recv from the switch module error."));
168

169
        switch (dense_cpu_tensor.dtype()) {
170
          case DataType::FLOAT32:
171
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
172
                           reinterpret_cast<float*>(cpu_tensor2.data()),
173
                           dense_cpu_tensor.numel());
174 175
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
176 177 178
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
179 180
            break;
          case DataType::INT32:
181
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
182
                         reinterpret_cast<int*>(cpu_tensor2.data()),
183
                         dense_cpu_tensor.numel());
184 185 186 187
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
188
                framework::DataType2String(dense_cpu_tensor.dtype())));
189 190
        }
      }
L
lilong12 已提交
191
    } else {
192
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
193 194 195 196
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
197 198 199 200
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
201 202 203 204 205
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
206 207
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
208
  broadcast_task->Wait();
209
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
210 211 212
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
213
    std::vector<phi::DenseTensor>& in_tensors,
214 215
    std::vector<phi::DenseTensor>& out_tensors,
    const BroadcastOptions& opts) {
L
lilong12 已提交
216 217
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
218 219
      CheckTensorsInCudaPlace(in_tensors),
      true,
L
lilong12 已提交
220
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
221
  PADDLE_ENFORCE_EQ(
222 223
      CheckTensorsInCudaPlace(out_tensors),
      true,
224
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
225 226 227 228
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
229 230
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
231 232

  if (local_rank_ == 0) {
233 234 235 236
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
237
      phi::DenseTensor cpu_tensor;
238 239
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
240
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
241 242
    }
    if (with_switch_) {
243 244 245
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
246
        auto dense_cpu_tensor = cpu_tensors[0];
247
        if (gloo_rank_ == 0) {
248
          std::vector<int64_t> send_size;
249
          send_size.push_back(dense_cpu_tensor.numel());
250
          int ret = client_->Send(
251 252 253
              gid_,
              {dense_cpu_tensor.name()},
              send_size,
254 255 256
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
257 258
          PADDLE_ENFORCE_EQ(ret,
                            0,
259 260
                            platform::errors::PreconditionNotMet(
                                "Send to the switch module error."));
261 262
        } else {
          int ret = client_->Recv(
263 264 265
              gid_,
              {dense_cpu_tensor.name()},
              dense_cpu_tensor.data(),
266 267
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
268 269
          PADDLE_ENFORCE_EQ(ret,
                            0,
270 271 272 273
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
274
    } else {
275
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
276 277
      gloo_task->Wait();
    }
278 279 280 281
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
282 283
    }
  }
284
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
285
  broadcast_task->Wait();
286
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
287 288
}

289 290 291
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
    std::vector<phi::DenseTensor>& in_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
292 293
      in_tensors.size(),
      1,
294 295 296 297 298 299 300
      platform::errors::PreconditionNotMet(
          "For each send operation, there can only be one tensor to send."));
  // Copy Tensor to cpu
  auto start = std::chrono::high_resolution_clock::now();
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = in_tensors[0];
  framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
301 302
  PADDLE_ENFORCE_EQ(with_switch_,
                    true,
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
          << ") from gpu to cpu for send " << std::setw(9)
          << " is: " << diff.count() << " s" << std::endl;

  // Send to switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  int64_t tensor_size =
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
  std::vector<int64_t> send_size;
  send_size.push_back(tensor_size);
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(send_count++);
  VLOG(2) << "tensor_name:" << tensor_name;
322 323
  int ret = client_->Send(
      gid_, {tensor_name}, send_size, cpu_tensor.data(), tensor_size);
324
  PADDLE_ENFORCE_EQ(
325 326
      ret,
      0,
327
      platform::errors::PreconditionNotMet("Send to the switch module error."));
328 329 330 331 332 333
  return CreateTask(rank_, CommType::SEND, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
    std::vector<phi::DenseTensor>& out_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
334 335
      out_tensors.size(),
      1,
336 337 338 339 340 341 342 343 344 345
      platform::errors::PreconditionNotMet(
          "For each rece operation, there can only be one tensor to receive."));

  // Copy Tensor to cpu
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = out_tensors[0];
  cpu_tensor.Resize(gpu_tensor.dims());
  cpu_tensor.set_layout(gpu_tensor.layout());
  cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());

346 347
  PADDLE_ENFORCE_EQ(with_switch_,
                    true,
348 349 350 351 352 353 354 355 356 357 358
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  // recv from switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(recv_count++);
  VLOG(2) << "tensor_name: " << tensor_name;
  auto start = std::chrono::high_resolution_clock::now();
  int ret = client_->Recv(
359 360 361
      gid_,
      {tensor_name},
      cpu_tensor.data(),
362
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
363 364
  PADDLE_ENFORCE_EQ(ret,
                    0,
365 366
                    platform::errors::PreconditionNotMet(
                        "receive to the switch module error."));
367 368 369 370 371 372 373 374 375 376
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  double goodput = cpu_tensor.numel() *
                   framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
  VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
  start = std::chrono::high_resolution_clock::now();
  framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
  end = std::chrono::high_resolution_clock::now();
  diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
377
          << ") from cpu to gpu for recv " << std::setw(9)
378 379 380 381
          << " is: " << diff.count() << " s" << std::endl;
  return CreateTask(rank_, CommType::RECV, out_tensors);
}

382 383
}  // namespace distributed
}  // namespace paddle