ProcessGroupHeter.cc 14.3 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
16
#include <chrono>
L
lilong12 已提交
17 18 19 20 21 22 23 24 25 26 27
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;
28 29
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
L
lilong12 已提交
30 31

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
32
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
L
lilong12 已提交
33 34 35 36
  return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
                                                        inputs);
}

37 38
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
39 40 41 42 43 44 45 46 47 48 49
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

50 51 52
ProcessGroupHeter::ProcessGroupHeter(
    const std::shared_ptr<Store>& store, int rank, int size,
    const platform::Place& place, int gid, int local_rank, int local_size,
53 54
    int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint,
    int src_rank, int dst_rank)
55
    : ProcessGroup(rank, size, place, gid),
L
lilong12 已提交
56 57 58 59 60
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
61
      with_switch_(with_switch),
62 63 64 65
      switch_endpoint_(switch_endpoint),
      src_rank_(src_rank),
      dst_rank_(dst_rank) {
  return;
L
lilong12 已提交
66 67
#if defined(PADDLE_WITH_NCCL)
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
68
                                                 place_, IGNORE_ID);
L
lilong12 已提交
69 70
#elif defined(PADDLE_WITH_ASCEND_CL)
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
71
                                                 place_, IGNORE_ID);
L
lilong12 已提交
72
#else
73
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
74 75
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
76
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
77 78
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
79 80
    inter_pg_ = std::make_shared<ProcessGroupGloo>(
        store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
L
lilong12 已提交
81 82 83
  }
}

84 85 86 87 88 89 90 91 92
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
93
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
94 95
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const AllreduceOptions& opts) {
L
lilong12 已提交
96 97
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
98
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
99
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
100 101 102
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
103 104 105
#endif

  // Step1: do allreduce in inner cluster
106
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
107 108 109 110
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
111 112
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
113
    phi::DenseTensor cpu_tensor;
114 115 116 117
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
118
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
119 120 121
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
122 123 124
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
125
        auto dense_cpu_tensor = cpu_tensors[0];
126
        std::vector<int64_t> send_size;
127
        send_size.push_back(dense_cpu_tensor.numel());
128
        int ret = client_->Send(
129 130 131
            gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
132 133
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Send to the switch module error."));
L
lilong12 已提交
134 135 136 137 138 139
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
            dense_cpu_tensor.dtype(), dense_cpu_tensor.numel());
140
        ret = client_->Recv(
L
lilong12 已提交
141 142
            gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(),
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
143 144 145
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Recv from the switch module error."));

146
        switch (dense_cpu_tensor.dtype()) {
147
          case DataType::FLOAT32:
148
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
149
                           reinterpret_cast<float*>(cpu_tensor2.data()),
150
                           dense_cpu_tensor.numel());
151 152
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
153 154 155
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
156 157
            break;
          case DataType::INT32:
158
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
159
                         reinterpret_cast<int*>(cpu_tensor2.data()),
160
                         dense_cpu_tensor.numel());
161 162 163 164
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
165
                framework::DataType2String(dense_cpu_tensor.dtype())));
166 167
        }
      }
L
lilong12 已提交
168
    } else {
169
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
170 171 172 173
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
174 175 176 177
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
178 179 180 181 182
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
183 184
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
185
  broadcast_task->Wait();
186
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
187 188 189
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
190 191
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const BroadcastOptions& opts) {
L
lilong12 已提交
192 193
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
194
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
195
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
196 197 198
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
199 200 201 202
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
203 204
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
205 206

  if (local_rank_ == 0) {
207 208 209 210
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
211
      phi::DenseTensor cpu_tensor;
212 213
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
214
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
215 216
    }
    if (with_switch_) {
217 218 219
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
220
        auto dense_cpu_tensor = cpu_tensors[0];
221
        if (gloo_rank_ == 0) {
222
          std::vector<int64_t> send_size;
223
          send_size.push_back(dense_cpu_tensor.numel());
224
          int ret = client_->Send(
225 226 227 228
              gid_, {dense_cpu_tensor.name()}, send_size,
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
229 230 231 232
          PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                        "Send to the switch module error."));
        } else {
          int ret = client_->Recv(
233 234 235
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
236 237 238 239 240
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
241
    } else {
242
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
243 244
      gloo_task->Wait();
    }
245 246 247 248
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
249 250
    }
  }
251
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
252
  broadcast_task->Wait();
253
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
254 255
}

256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
    std::vector<phi::DenseTensor>& in_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(in_tensors), true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

  PADDLE_ENFORCE_EQ(
      in_tensors.size(), 1,
      platform::errors::PreconditionNotMet(
          "For each send operation, there can only be one tensor to send."));
  // Copy Tensor to cpu
  auto start = std::chrono::high_resolution_clock::now();
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = in_tensors[0];
  framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
  PADDLE_ENFORCE_EQ(with_switch_, true,
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
          << ") from gpu to cpu for send " << std::setw(9)
          << " is: " << diff.count() << " s" << std::endl;

  // Send to switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  int64_t tensor_size =
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
  std::vector<int64_t> send_size;
  send_size.push_back(tensor_size);
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(send_count++);
  VLOG(2) << "tensor_name:" << tensor_name;
  int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(),
                          tensor_size);
  PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                "Send to the switch module error."));
  return CreateTask(rank_, CommType::SEND, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
    std::vector<phi::DenseTensor>& out_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

  PADDLE_ENFORCE_EQ(
      out_tensors.size(), 1,
      platform::errors::PreconditionNotMet(
          "For each rece operation, there can only be one tensor to receive."));

  // Copy Tensor to cpu
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = out_tensors[0];
  cpu_tensor.Resize(gpu_tensor.dims());
  cpu_tensor.set_layout(gpu_tensor.layout());
  cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());

  PADDLE_ENFORCE_EQ(with_switch_, true,
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  // recv from switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(recv_count++);
  VLOG(2) << "tensor_name: " << tensor_name;
  auto start = std::chrono::high_resolution_clock::now();
  int ret = client_->Recv(
      gid_, {tensor_name}, cpu_tensor.data(),
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
  PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                "receive to the switch module error."));
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  double goodput = cpu_tensor.numel() *
                   framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
  VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
  start = std::chrono::high_resolution_clock::now();
  framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
  end = std::chrono::high_resolution_clock::now();
  diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
          << ") from gpu to cpu for recv " << std::setw(9)
          << " is: " << diff.count() << " s" << std::endl;
  return CreateTask(rank_, CommType::RECV, out_tensors);
}

351 352
}  // namespace distributed
}  // namespace paddle