ProcessGroupNCCL.cc 34.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
16

L
lilong12 已提交
17
#include "paddle/fluid/distributed/collective/Common.h"
18
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
19
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
B
Baibaifan 已提交
20
#include "paddle/fluid/platform/place.h"
L
LiYuRio 已提交
21
#include "paddle/phi/api/lib/utils/allocator.h"
B
Baibaifan 已提交
22
#include "paddle/phi/common/place.h"
23 24 25 26 27 28 29 30 31 32 33

DECLARE_bool(nccl_blocking_wait);
DECLARE_bool(use_stream_safe_cuda_allocator);

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

void SyncDefaultStream(
    const std::vector<Place>& places,
L
Leo Chen 已提交
34 35
    std::vector<EventManager>& ncclEvents,                     // NOLINT
    std::vector<std::unique_ptr<phi::GPUContext>>& dev_ctx) {  // NOLINT
36
  for (size_t i = 0; i < places.size(); ++i) {
L
Leo Chen 已提交
37
    auto* default_ctx = static_cast<phi::GPUContext*>(
38
        platform::DeviceContextPool::Instance().Get(places[i]));
39 40
    ncclEvents[i].Record(*default_ctx);
    ncclEvents[i].Block(*dev_ctx[i]);
41 42 43 44
  }
}

std::shared_ptr<ProcessGroupNCCL::NCCLTask> ProcessGroupNCCL::CreateTask(
45 46 47
    std::vector<Place> places,
    int rank,
    CommType comm_type,
48
    const std::vector<phi::DenseTensor>& inputs) {
49 50
  return std::make_shared<ProcessGroupNCCL::NCCLTask>(
      places, rank, comm_type, inputs);
51 52
}

53
ProcessGroupNCCL::NCCLTask::NCCLTask(
54 55 56
    const std::vector<Place>& places,
    int rank,
    CommType CommType,
57
    const std::vector<phi::DenseTensor>& inputs)
58 59 60 61 62 63 64 65 66 67 68 69 70 71
    : TaskStream(rank, inputs, CommType), places_(places) {
  control_events_.resize(places.size());
  ncclComms_.resize(places.size());
}

ProcessGroupNCCL::NCCLTask::NCCLTask(
    const std::vector<Place>& places,
    int rank,
    CommType comm_type,
    const std::vector<phi::DenseTensor>& inputs,
    bool sync_op,
    bool use_calc_stream)
    : TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream),
      places_(places) {
72 73 74 75 76 77 78
  control_events_.resize(places.size());
  ncclComms_.resize(places.size());
}

ProcessGroupNCCL::NCCLTask::~NCCLTask() {}

void ProcessGroupNCCL::NCCLTask::SetOutputs(
79 80
    std::vector<phi::DenseTensor>& outputs) {  // NOLINT
  outputs_ = std::make_shared<std::vector<phi::DenseTensor>>(outputs);
81 82 83 84
}

void ProcessGroupNCCL::NCCLTask::SynchronizeStreams() {
  for (size_t i = 0; i < places_.size(); ++i) {
L
Leo Chen 已提交
85
    auto* default_ctx = static_cast<phi::GPUContext*>(
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
        platform::DeviceContextPool::Instance().Get(places_[i]));
    default_ctx->WaitEvent(control_events_[i].GetRawCudaEvent());
  }
}

bool ProcessGroupNCCL::NCCLTask::IsCompleted() {
  for (size_t i = 0; i < places_.size(); ++i) {
    if (!control_events_[i].Query()) {
      return false;
    }
  }

  return true;
}

101
void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,
102
                                       std::vector<int64_t> tensor_shape) {
103
  int64_t len_size = (*split_sizes).size();
104 105 106 107 108 109
  if (len_size == 0) {
    PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0,
                      true,
                      platform::errors::InvalidArgument(
                          "Tensor's dim[0] must be divisible by group size "
                          "when split_sizes not given."));
110 111 112 113
    (*split_sizes)
        .insert((*split_sizes).end(),
                size_,
                static_cast<int64_t>(tensor_shape[0] / size_));
114 115 116 117 118 119 120
  } else {
    PADDLE_ENFORCE_EQ(
        len_size == size_,
        true,
        platform::errors::InvalidArgument(
            "The length of split_sizes must be equal to group size."));
    auto sum_size = std::accumulate(
121
        (*split_sizes).begin(), (*split_sizes).end(), static_cast<int64_t>(0));
122 123 124 125 126 127 128 129
    PADDLE_ENFORCE_EQ(
        sum_size == tensor_shape[0],
        true,
        platform::errors::InvalidArgument(
            "The sum of split_sizes must be equal to tensor's dim[0]."));
  }
}

130 131
// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) {
132 133 134 135 136 137 138
  // Warning here when use calc stream but also invoke waiting explicitly.
  if (UseCalcStream()) {
    VLOG(3) << "Warning: The communication is on calc stream, wait here is "
               "useless.";
    return true;
  }

139 140 141 142 143 144 145
  SynchronizeStreams();
  if (FLAGS_nccl_blocking_wait) {
    // NOTE(shenliang03): It will block host for sync
    while (!IsCompleted()) {
      std::this_thread::sleep_for(std::chrono::milliseconds(kWaitBlockTImeout));
    }
  }
B
Baibaifan 已提交
146 147 148 149 150

  if (!barrierTensors_.empty()) {
    // If we use the work to do barrier, we should block cpu
    for (auto& place : places_) {
      platform::CUDADeviceGuard gpuGuard(place);
S
ShenLiang 已提交
151
#ifdef PADDLE_WITH_CUDA
B
Baibaifan 已提交
152
      PADDLE_ENFORCE_GPU_SUCCESS(cudaDeviceSynchronize());
S
ShenLiang 已提交
153 154 155
#else
      PADDLE_ENFORCE_GPU_SUCCESS(hipDeviceSynchronize());
#endif
B
Baibaifan 已提交
156 157
    }
  }
158 159 160 161 162 163
  return true;
}

// Same as Wait
void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); }

164
ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store,
165 166 167 168
                                   int rank,
                                   int size,
                                   const platform::Place& place,
                                   int gid)
169
    : ProcessGroupStream(rank, size, place, gid), store_(store) {
170 171
  platform::SetDeviceId(place_.device);
}
172 173 174

void ProcessGroupNCCL::BroadcastUniqueNCCLID(
    std::vector<ncclUniqueId>& nccl_ids) {  // NOLINT
175 176
  if (rank_ == 0) {
    for (size_t i = 0; i < nccl_ids.size(); i++) {
177 178
      auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(gid_) + "/" +
                 std::to_string(i);
179 180 181 182 183 184 185
      auto nccl_id = std::vector<uint8_t>(
          reinterpret_cast<uint8_t*>(&nccl_ids[i]),
          reinterpret_cast<uint8_t*>(&nccl_ids[i]) + NCCL_UNIQUE_ID_BYTES);
      store_->set(key, nccl_id);
    }
  } else {
    for (size_t i = 0; i < nccl_ids.size(); i++) {
186 187
      auto key = "ProcessGroupNCCL/nccl_ids/" + std::to_string(gid_) + "/" +
                 std::to_string(i);
188 189 190
      auto ret = store_->get(key);
      std::memcpy(&nccl_ids[i], ret.data(), ret.size());
    }
191 192 193 194 195 196
  }
}

// create NCCLManager cache for places_key
void ProcessGroupNCCL::CreateNCCLManagerCache(
    const std::string& places_key, const std::vector<Place>& places) {
197 198
  PADDLE_ENFORCE_EQ(places_key.empty(),
                    false,
199 200 201 202 203 204 205 206 207 208 209 210
                    platform::errors::PreconditionNotMet(
                        "Not able to create/get the NCCL Communicator since "
                        "the GPU place are not known"));

  std::vector<std::shared_ptr<NCCLCommManager>> nccl_comms;
  nccl_comms.resize(places.size());

  // using vector just for broadcast
  std::vector<ncclUniqueId> nccl_ids;
  nccl_ids.resize(1);
  auto& nccl_id = nccl_ids.front();

B
Baibaifan 已提交
211 212 213 214
  for (auto& place : places) {
    used_place_ids_.insert(place.GetDeviceId());
  }

215 216 217 218 219
  if (rank_ == 0) {
    PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGetUniqueId(&nccl_id));
  }
  BroadcastUniqueNCCLID(nccl_ids);

220 221
  VLOG(3) << "init nccl rank: " << rank_ << ", nranks: " << size_
          << ", place: " << places_key
222 223
          << ", nccl uniqueid: " << SerializeNCCLUniqueId(nccl_id);

L
Leo Chen 已提交
224
  std::vector<std::unique_ptr<phi::GPUContext>> dev_ctx;
225 226 227 228 229 230 231
  dev_ctx.resize(places.size());

  PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart());

  for (size_t i = 0; i < places.size(); ++i) {
    platform::CUDADeviceGuard guard(places[i]);
    nccl_comms[i] = NCCLCommManager::Create(GetSize(), GetRank(), nccl_id);
L
Leo Chen 已提交
232
    dev_ctx[i].reset(new phi::GPUContext(places[i]));
233 234 235 236 237 238 239 240 241 242 243 244 245
  }

  PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd());

  std::vector<EventManager> events;
  events.resize(places.size());

  // These caches will be useful to process sync/wait/communicate
  places_to_events_.emplace(places_key, std::move(events));
  places_to_ncclcomm_.emplace(places_key, std::move(nccl_comms));
  places_to_ctx_.emplace(places_key, std::move(dev_ctx));
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    Fn fn,
    CommType comm_type,
    bool sync_op,
    bool use_calc_stream) {
  const auto& places = GetPlaceList(inputs);
  const auto& key = GetKeyFromPlaces(places);

  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
      CreateNCCLManagerCache(key, places);
    }
  }

  auto& nccl_comms = places_to_ncclcomm_[key];

  SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);

  auto task = std::make_shared<ProcessGroupNCCL::NCCLTask>(
      places, rank_, comm_type, inputs, sync_op, use_calc_stream);

  platform::CUDADeviceGuard cuda_guard;

  {
    platform::NCCLGroupGuard nccl_guard;
    for (size_t i = 0; i < inputs.size(); ++i) {
      cuda_guard.SetDevice(places[i]);

      gpuStream_t nccl_stream;
      if (use_calc_stream) {
        nccl_stream =
            static_cast<phi::GPUContext*>(
                platform::DeviceContextPool::Instance().Get(places[i]))
                ->stream();
      } else {
        nccl_stream = places_to_ctx_[key][i]->stream();
      }

      fn(inputs[i], outputs[i], nccl_comms[i]->GetNcclComm(), nccl_stream);
    }
  }

  if (FLAGS_use_stream_safe_cuda_allocator) {
    for (size_t i = 0; i < inputs.size(); ++i) {
      cuda_guard.SetDevice(places[i]);

      gpuStream_t nccl_stream;
      if (use_calc_stream) {
        nccl_stream =
            static_cast<phi::GPUContext*>(
                platform::DeviceContextPool::Instance().Get(places[i]))
                ->stream();
      } else {
        nccl_stream = places_to_ctx_[key][i]->stream();
      }

      memory::RecordStream(inputs[i].Holder(), nccl_stream);
    }
  }

  // Adding stream event dependency only when use comm stream
  if (!use_calc_stream) {
    for (size_t i = 0; i < inputs.size(); ++i) {
      cuda_guard.SetDevice(places[i]);
      task->control_events_[i].Record(*places_to_ctx_[key][i]);
    }
  }

  return task;
}

321 322
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
323
    std::vector<phi::DenseTensor>& inputs,
324 325 326
    std::vector<phi::DenseTensor>& outputs,
    Fn fn,
    CommType op_type) {
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
  const auto places = GetPlaceList(inputs);
  const auto key = GetKeyFromPlaces(places);

  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
      CreateNCCLManagerCache(key, places);
    }
  }

  auto& nccl_comms = places_to_ncclcomm_[key];

  SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);

  auto task = CreateTask(places, rank_, op_type, inputs);

  // construct uninitialize guard for device
  platform::CUDADeviceGuard cuda_guard;

S
ShenLiang 已提交
346 347
  {
    platform::NCCLGroupGuard nccl_guard;
348 349
    for (size_t i = 0; i < inputs.size(); ++i) {
      cuda_guard.SetDevice(places[i]);
S
ShenLiang 已提交
350 351
      const auto& nccl_stream = places_to_ctx_[key][i]->stream();
      fn(inputs[i], outputs[i], nccl_comms[i]->GetNcclComm(), nccl_stream);
352 353 354
    }
  }

S
ShenLiang 已提交
355
  if (FLAGS_use_stream_safe_cuda_allocator) {
356 357
    for (size_t i = 0; i < inputs.size(); ++i) {
      cuda_guard.SetDevice(places[i]);
S
ShenLiang 已提交
358 359
      memory::RecordStream(inputs[i].Holder(),
                           places_to_ctx_[key][i]->stream());
360 361 362 363 364 365 366 367 368 369
    }
  }

  for (size_t i = 0; i < inputs.size(); ++i) {
    cuda_guard.SetDevice(places[i]);
    task->control_events_[i].Record(*places_to_ctx_[key][i]);
  }
  return task;
}

370 371
template <typename Fn>
void ProcessGroupNCCL::Collective(const phi::DenseTensor* in,
372 373
                                  phi::DenseTensor* out,
                                  Fn fn,
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
                                  CommType op_type) {
  std::vector<Place> places;
  places.push_back(in->place());
  const auto key = GetKeyFromPlaces(places);

  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
      CreateNCCLManagerCache(key, places);
    }
  }

  auto& nccl_comms = places_to_ncclcomm_[key];

  SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);

  // construct uninitialize guard for device
  platform::CUDADeviceGuard cuda_guard;

  if (FLAGS_use_stream_safe_cuda_allocator) {
    cuda_guard.SetDevice(places[0]);
    memory::RecordStream(in->Holder(), places_to_ctx_[key][0]->stream());
  }

  {
    platform::NCCLGroupGuard nccl_guard;
    cuda_guard.SetDevice(places[0]);
    const auto& nccl_stream = places_to_ctx_[key][0]->stream();
    fn(in, out, nccl_comms[0]->GetNcclComm(), nccl_stream);
  }

  cuda_guard.SetDevice(places[0]);
}

B
Baibaifan 已提交
408 409
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
410 411 412
    std::vector<phi::DenseTensor>& tensors,
    Fn fn,
    int dst_rank,
413
    CommType op_type) {
B
Baibaifan 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
  const auto places = GetPlaceList(tensors);
  const auto key = GetKeyFromPlaces(places);

  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
      CreateNCCLManagerCache(key, places);
    }
  }

  auto& nccl_comms = places_to_ncclcomm_[key];

  SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);

  auto task = CreateTask(places, rank_, op_type, tensors);

  // construct uninitialize guard for device
  platform::CUDADeviceGuard cuda_guard;

  if (FLAGS_use_stream_safe_cuda_allocator) {
    for (size_t i = 0; i < tensors.size(); ++i) {
      cuda_guard.SetDevice(places[i]);
436
      memory::RecordStream(tensors[i].Holder(),
B
Baibaifan 已提交
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
                           places_to_ctx_[key][i]->stream());
    }
  }

  {
    platform::NCCLGroupGuard nccl_guard;
    for (size_t i = 0; i < tensors.size(); ++i) {
      cuda_guard.SetDevice(places[i]);
      const auto& nccl_stream = places_to_ctx_[key][i]->stream();
      fn(tensors[i], nccl_comms[i]->GetNcclComm(), nccl_stream, dst_rank);
    }
  }

  for (size_t i = 0; i < tensors.size(); ++i) {
    cuda_guard.SetDevice(places[i]);
    task->control_events_[i].Record(*places_to_ctx_[key][i]);
  }
  return task;
}

457
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
458
    std::vector<phi::DenseTensor>& in_tensors,
459 460
    std::vector<phi::DenseTensor>& out_tensors,
    const AllreduceOptions& opts) {
461
  PADDLE_ENFORCE_EQ(
462 463
      CheckTensorsInCudaPlace(in_tensors),
      true,
464
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
465
  return Collective(
466 467 468 469 470 471
      in_tensors,
      out_tensors,
      [&](const phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
472
        return platform::dynload::ncclAllReduce(
473 474 475
            input.data(),
            output.data(),
            input.numel(),
476
            platform::ToNCCLDataType(input.type()),
477 478 479
            ToNCCLRedType(opts.reduce_op),
            comm,
            stream);
480 481
      },
      CommType::ALLREDUCE);
482 483
}

484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors,
    const AllreduceOptions& opts,
    bool sync_op,
    bool use_calc_stream) {
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(in_tensors),
      true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  return Collective(
      in_tensors,
      out_tensors,
      [&](const phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
        return platform::dynload::ncclAllReduce(
            input.data(),
            output.data(),
            input.numel(),
            platform::ToNCCLDataType(input.type()),
            ToNCCLRedType(opts.reduce_op),
            comm,
            stream);
      },
      CommType::ALLREDUCE,
      sync_op,
      use_calc_stream);
}

515
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
516
    std::vector<phi::DenseTensor>& in_tensors,
517 518
    std::vector<phi::DenseTensor>& out_tensors,
    const BroadcastOptions& opts) {
519
  PADDLE_ENFORCE_EQ(
520 521
      CheckTensorsInCudaPlace(in_tensors),
      true,
522 523
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));

524
  return Collective(
525 526 527 528 529
      in_tensors,
      out_tensors,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
530 531 532 533
          const gpuStream_t& stream) {
        const auto root =
            opts.source_rank * in_tensors.size() + opts.source_root;
        return platform::dynload::ncclBroadcast(
534 535 536 537 538 539 540
            input.data(),
            output.data(),
            input.numel(),
            platform::ToNCCLDataType(input.type()),
            root,
            comm,
            stream);
541 542
      },
      CommType::BROADCAST);
543 544
}

B
Baibaifan 已提交
545 546
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier(
    const BarrierOptions& opts) {
B
Baibaifan 已提交
547 548
  // Only support single card single process
  std::vector<phi::GPUPlace> places = {place_};
B
Baibaifan 已提交
549

550
  std::vector<phi::DenseTensor> barrierTensors;
B
Baibaifan 已提交
551 552 553 554 555
  barrierTensors.reserve(places.size());

  platform::CUDADeviceGuard gpuGuard;
  for (auto& place : places) {
    gpuGuard.SetDeviceIndex(place.GetDeviceId());
L
LiYuRio 已提交
556 557 558 559
    phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim({1}));
    auto allocator = std::unique_ptr<phi::Allocator>(
        new paddle::experimental::DefaultAllocator(place));
    barrierTensors.emplace_back(allocator.get(), meta);
B
Baibaifan 已提交
560
  }
561 562
  auto task = ProcessGroupNCCL::AllReduce(
      barrierTensors, barrierTensors, AllreduceOptions());
B
Baibaifan 已提交
563 564 565 566 567
  auto nccl_task = dynamic_cast<ProcessGroupNCCL::NCCLTask*>(task.get());
  nccl_task->barrierTensors_ = std::move(barrierTensors);
  return task;
}

568 569
void CheckTensorsInDifferentDevices(
    const std::vector<phi::DenseTensor>& tensors, const size_t num_devices) {
B
Baibaifan 已提交
570
  PADDLE_ENFORCE_EQ(
571 572
      tensors.size() == 0,
      false,
B
Baibaifan 已提交
573 574
      platform::errors::InvalidArgument("Tensor list must be nonempty."));
  PADDLE_ENFORCE_LE(
575 576
      tensors.size(),
      num_devices,
B
Baibaifan 已提交
577 578 579 580 581 582
      platform::errors::InvalidArgument(
          "Tensor list mustn't be larger than the number of available GPUs."));

  std::set<Place> used_devices;

  for (const auto& t : tensors) {
583 584
    PADDLE_ENFORCE_EQ(platform::is_gpu_place(t.place()),
                      true,
B
Baibaifan 已提交
585 586 587
                      platform::errors::InvalidArgument(
                          "Tensors must be CUDA and dense tensor."));

588
    const auto inserted = used_devices.insert(t.place()).second;
589 590
    PADDLE_ENFORCE_EQ(inserted,
                      true,
B
Baibaifan 已提交
591 592 593 594 595 596
                      platform::errors::InvalidArgument(
                          "Tensors must be on distinct GPU devices."));
  }
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send(
597
    std::vector<phi::DenseTensor>& tensors, int dst_rank) {
B
Baibaifan 已提交
598 599
  CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

600 601
  auto task = PointToPoint(
      tensors,
602 603 604
      [&](phi::DenseTensor& input,
          ncclComm_t comm,
          const gpuStream_t& stream,
605 606
          int dst_rank) {
        return platform::dynload::ncclSend(
607 608 609 610 611 612
            input.data(),
            input.numel(),
            platform::ToNCCLDataType(input.dtype()),
            dst_rank,
            comm,
            stream);
613
      },
614 615
      dst_rank,
      CommType::SEND);
B
Baibaifan 已提交
616 617 618 619
  return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
620
    std::vector<phi::DenseTensor>& tensors, int src_rank) {
B
Baibaifan 已提交
621 622
  CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

623 624
  auto task = PointToPoint(
      tensors,
625 626 627
      [&](phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream,
628 629
          int src_rank) {
        return platform::dynload::ncclRecv(
630 631 632 633 634 635
            output.data(),
            output.numel(),
            platform::ToNCCLDataType(output.dtype()),
            src_rank,
            comm,
            stream);
636
      },
637 638
      src_rank,
      CommType::RECV);
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
  return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial(
    phi::DenseTensor& tensors, int dst_rank, int offset, int length) {
  // CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

  phi::DenseTensor flatten_tensor;
  flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});

  phi::DenseTensor shared_input = flatten_tensor.Slice(offset, offset + length);

  std::vector<phi::DenseTensor> shared_tensors;
  shared_tensors.push_back(shared_input);

654 655
  auto task = PointToPoint(
      shared_tensors,
656 657 658
      [&](phi::DenseTensor& input,
          ncclComm_t comm,
          const gpuStream_t& stream,
659 660
          int dst_rank) {
        return platform::dynload::ncclSend(
661 662 663 664 665 666
            input.data(),
            input.numel(),
            platform::ToNCCLDataType(input.dtype()),
            dst_rank,
            comm,
            stream);
667
      },
668 669
      dst_rank,
      CommType::SEND);
670 671 672 673 674 675 676 677 678 679 680 681 682 683
  return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial(
    phi::DenseTensor& tensors, int src_rank, int offset, int length) {
  // phi::DenseTensor shared_input = tensors.Slice(offset, offset+length);

  phi::DenseTensor flatten_tensor;
  flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});
  phi::DenseTensor shared_input = flatten_tensor.Slice(offset, offset + length);

  std::vector<phi::DenseTensor> shared_tensors;
  shared_tensors.push_back(shared_input);

684 685
  auto task = PointToPoint(
      shared_tensors,
686 687 688
      [&](phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream,
689 690
          int src_rank) {
        return platform::dynload::ncclRecv(
691 692 693 694 695 696
            output.data(),
            output.numel(),
            platform::ToNCCLDataType(output.dtype()),
            src_rank,
            comm,
            stream);
697
      },
698 699
      src_rank,
      CommType::RECV);
B
Baibaifan 已提交
700 701 702
  return task;
}

703
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather(
704 705
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
706
  PADDLE_ENFORCE_EQ(
707 708
      CheckTensorsInCudaPlace(in_tensors),
      true,
709 710
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  PADDLE_ENFORCE_EQ(
711 712
      CheckTensorsInCudaPlace(out_tensors),
      true,
713
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
714
  return Collective(
715 716 717 718 719 720
      in_tensors,
      out_tensors,
      [&](const phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
721
        return platform::dynload::ncclAllGather(
722 723 724 725 726 727
            input.data(),
            output.data(),
            input.numel(),
            platform::ToNCCLDataType(input.dtype()),
            comm,
            stream);
728 729
      },
      CommType::ALLGATHER);
730 731
}

732 733
void* GetPointerByOffset(void* raw_pointer,
                         size_t offset,
734 735 736 737 738 739 740
                         experimental::DataType type) {
  if (type == experimental::DataType::FLOAT32) {
    return reinterpret_cast<void*>(reinterpret_cast<float*>(raw_pointer) +
                                   offset);
  } else if (type == experimental::DataType::FLOAT64) {
    return reinterpret_cast<void*>(reinterpret_cast<double*>(raw_pointer) +
                                   offset);
741 742 743
  } else if (type == experimental::DataType::FLOAT16) {
    return reinterpret_cast<void*>(reinterpret_cast<int16_t*>(raw_pointer) +
                                   offset);
744 745 746 747 748 749
  } else if (type == experimental::DataType::INT32) {
    return reinterpret_cast<void*>(reinterpret_cast<int32_t*>(raw_pointer) +
                                   offset);
  } else if (type == experimental::DataType::INT64) {
    return reinterpret_cast<void*>(reinterpret_cast<int64_t*>(raw_pointer) +
                                   offset);
750 751 752 753 754 755 756 757
  } else if (type == experimental::DataType::INT8) {
    return reinterpret_cast<void*>(reinterpret_cast<int8_t*>(raw_pointer) +
                                   offset);
  } else if (type == experimental::DataType::UINT8) {
    return reinterpret_cast<void*>(reinterpret_cast<uint8_t*>(raw_pointer) +
                                   offset);
  } else if (type == experimental::DataType::BOOL) {
    return reinterpret_cast<void*>(reinterpret_cast<bool*>(raw_pointer) +
758 759 760 761 762
                                   offset);
  } else {
    PADDLE_THROW(platform::errors::Unimplemented(
        "This datatype in nccl is not supported."));
  }
763
  return nullptr;
764 765
}

766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather_Partial(
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors,
    int offset,
    int length) {
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(in_tensors),
      true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors),
      true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
  return Collective(
      in_tensors,
      out_tensors,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
        return platform::dynload::ncclAllGather(
            GetPointerByOffset(input.data(), offset, input.dtype()),
            output.data(),
            length,
            platform::ToNCCLDataType(input.dtype()),
            comm,
            stream);
      },
      CommType::ALLGATHER);
}

797
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
798 799
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
800
  PADDLE_ENFORCE_EQ(
801 802
      CheckTensorsInCudaPlace(in_tensors),
      true,
803 804
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  PADDLE_ENFORCE_EQ(
805 806
      CheckTensorsInCudaPlace(out_tensors),
      true,
807 808
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  return Collective(
809 810 811 812 813
      in_tensors,
      out_tensors,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
814 815 816 817 818
          const gpuStream_t& stream) {
        size_t offset = 0;
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart());
        for (auto i = 0; i < size_; i++) {
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
819
              GetPointerByOffset(input.data(), offset, input.dtype()),
820 821 822 823 824
              input.numel() / size_,
              platform::ToNCCLDataType(input.dtype()),
              i,
              comm,
              stream));
825
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
826
              GetPointerByOffset(output.data(), offset, input.dtype()),
827 828 829 830 831
              input.numel() / size_,
              platform::ToNCCLDataType(input.dtype()),
              i,
              comm,
              stream));
832
          offset += input.numel() / size_;
833 834 835
        }
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd());
      },
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
      CommType::ALLTOALL);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll_Single(
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors,
    std::vector<int64_t>& in_sizes,
    std::vector<int64_t>& out_sizes) {
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(in_tensors),
      true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors),
      true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  return Collective(
      in_tensors,
      out_tensors,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
        PADDLE_ENFORCE_EQ(input.dtype() == output.dtype(),
                          true,
                          platform::errors::InvalidArgument(
                              "The dtypes of input and output must be equal."));

        std::vector<int64_t> in_dims = phi::vectorize(input.dims());
        std::vector<int64_t> out_dims = phi::vectorize(output.dims());
866 867
        CheckSplitSizes(&in_sizes, in_dims);
        CheckSplitSizes(&out_sizes, out_dims);
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898

        size_t in_offset = 0, out_offset = 0;
        size_t in_length = 0, out_length = 0;
        size_t in_row_size = input.numel() / in_dims[0];
        size_t out_row_size = output.numel() / out_dims[0];

        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart());
        for (auto i = 0; i < size_; i++) {
          in_length = in_sizes[i] * in_row_size;
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
              GetPointerByOffset(input.data(), in_offset, input.dtype()),
              in_length,
              platform::ToNCCLDataType(input.dtype()),
              i,
              comm,
              stream));
          in_offset += in_length;

          out_length = out_sizes[i] * out_row_size;
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
              GetPointerByOffset(output.data(), out_offset, input.dtype()),
              out_length,
              platform::ToNCCLDataType(input.dtype()),
              i,
              comm,
              stream));
          out_offset += out_length;
        }
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd());
      },
      CommType::ALLTOALL_SINGLE);
899 900 901
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Reduce(
902
    std::vector<phi::DenseTensor>& in_tensors,
903 904
    std::vector<phi::DenseTensor>& out_tensors,
    const ReduceOptions& opts) {
905
  PADDLE_ENFORCE_EQ(
906 907
      CheckTensorsInCudaPlace(in_tensors),
      true,
908 909
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  return Collective(
910 911 912 913 914 915
      in_tensors,
      out_tensors,
      [&](const phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
916
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclReduce(
917 918 919
            input.data(),
            output.data(),
            input.numel(),
920
            platform::ToNCCLDataType(input.dtype()),
921 922 923 924
            ToNCCLRedType(opts.reduce_op),
            opts.root_rank,
            comm,
            stream));
925 926 927 928 929
      },
      CommType::REDUCE);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Scatter(
930
    std::vector<phi::DenseTensor>& in_tensors,
931 932
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts) {
933
  PADDLE_ENFORCE_EQ(
934 935
      CheckTensorsInCudaPlace(in_tensors),
      true,
936 937
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  PADDLE_ENFORCE_EQ(
938 939
      CheckTensorsInCudaPlace(out_tensors),
      true,
940 941
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
  return Collective(
942 943 944 945 946
      in_tensors,
      out_tensors,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
947 948 949 950 951 952
          const gpuStream_t& stream) {
        size_t offset = 0;
        if (rank_ == opts.root_rank) {
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart());
          for (auto i = 0; i < size_; i++) {
            PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
953
                GetPointerByOffset(input.data(), offset, input.dtype()),
954 955 956 957 958
                input.numel() / size_,
                platform::ToNCCLDataType(input.dtype()),
                i,
                comm,
                stream));
959
            offset += input.numel() / size_;
960 961
          }
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
962 963 964 965 966
              output.data(),
              input.numel() / size_,
              platform::ToNCCLDataType(input.dtype()),
              opts.root_rank,
              comm,
967 968 969 970
              stream));
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd());
        } else {
          PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
971 972 973 974 975
              output.data(),
              input.numel() / size_,
              platform::ToNCCLDataType(input.dtype()),
              opts.root_rank,
              comm,
976 977 978 979 980 981
              stream));
        }
      },
      CommType::SCATTER);
}

982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::_ReduceScatterBase(
    phi::DenseTensor& out_tensor,
    phi::DenseTensor& in_tensor,
    const ReduceScatterOptions& opts) {
  // auto tensor = out_tensors.back();
  PADDLE_ENFORCE_EQ(
      out_tensor.dtype(),
      in_tensor.dtype(),
      platform::errors::InvalidArgument(
          "Input tensor and output tensor should be same dtype."));

  PADDLE_ENFORCE_EQ(
      out_tensor.numel() * size_,
      in_tensor.numel(),
      platform::errors::InvalidArgument("input tensor must be the same size as "
                                        "output tensor size times world_size"));

  auto inputs = std::vector<phi::DenseTensor>{in_tensor};
  auto outputs = std::vector<phi::DenseTensor>{out_tensor};

  return Collective(
      inputs,
      outputs,
      [&](phi::DenseTensor& input,
          phi::DenseTensor& output,
          ncclComm_t comm,
          const gpuStream_t& stream) {
        if (FLAGS_use_stream_safe_cuda_allocator) {
          platform::CUDADeviceGuard cuda_guard;
          cuda_guard.SetDevice(output.place());
          memory::RecordStream(output.Holder(), stream);
        }
        PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclReduceScatter(
            input.data(),
            output.data(),
            output.numel(),
            platform::ToNCCLDataType(input.dtype()),
            ToNCCLRedType(opts.reduce_op),
            comm,
            stream));
      },
      CommType::REDUCE_SCATTER);
}

void ProcessGroupNCCL::GroupStart() {
  PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupStart());
}

void ProcessGroupNCCL::GroupEnd() {
  PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd());
}

L
LiYuRio 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
ncclComm_t ProcessGroupNCCL::NCCLComm(const Place& place) const {
  std::vector<Place> places = {place};
  const auto& iter = places_to_ncclcomm_.find(GetKeyFromPlaces(places));
  PADDLE_ENFORCE_NE(iter,
                    places_to_ncclcomm_.end(),
                    platform::errors::InvalidArgument(
                        "Cannot find nccl comm in process group."));
  return iter->second[0]->GetNcclComm();
}

1044 1045
}  //  namespace distributed
}  //  namespace paddle