ProcessGroupHeter.cc 14.0 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
16

17
#include <chrono>
18

L
lilong12 已提交
19 20 21 22 23 24 25 26 27 28 29
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;
30 31
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;
L
lilong12 已提交
32 33

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
34
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
L
lilong12 已提交
35 36 37 38
  return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
                                                        inputs);
}

39 40
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
41 42 43 44 45 46 47 48 49 50 51
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

52 53 54
ProcessGroupHeter::ProcessGroupHeter(
    const std::shared_ptr<Store>& store, int rank, int size,
    const platform::Place& place, int gid, int local_rank, int local_size,
55 56
    int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint,
    int src_rank, int dst_rank)
57
    : ProcessGroup(rank, size, place, gid),
L
lilong12 已提交
58 59 60 61 62
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
63
      with_switch_(with_switch),
64 65 66 67
      switch_endpoint_(switch_endpoint),
      src_rank_(src_rank),
      dst_rank_(dst_rank) {
  return;
L
lilong12 已提交
68 69
#if defined(PADDLE_WITH_NCCL)
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
70
                                                 place_, IGNORE_ID);
L
lilong12 已提交
71 72
#elif defined(PADDLE_WITH_ASCEND_CL)
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
73
                                                 place_, IGNORE_ID);
L
lilong12 已提交
74
#else
75
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
76 77
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
78
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
79 80
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
81 82
    inter_pg_ = std::make_shared<ProcessGroupGloo>(
        store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
L
lilong12 已提交
83 84 85
  }
}

86 87 88 89 90 91 92 93 94
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
95
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
96 97
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const AllreduceOptions& opts) {
L
lilong12 已提交
98 99
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
100
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
101
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
102 103 104
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
105 106 107
#endif

  // Step1: do allreduce in inner cluster
108
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
109 110 111 112
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
113 114
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
115
    phi::DenseTensor cpu_tensor;
116 117 118 119
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
120
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
121 122 123
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
124 125 126
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
127
        auto dense_cpu_tensor = cpu_tensors[0];
128
        std::vector<int64_t> send_size;
129
        send_size.push_back(dense_cpu_tensor.numel());
130
        int ret = client_->Send(
131 132 133
            gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
134 135 136
        PADDLE_ENFORCE_EQ(ret, 0,
                          platform::errors::PreconditionNotMet(
                              "Send to the switch module error."));
L
lilong12 已提交
137 138 139 140 141 142
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
            dense_cpu_tensor.dtype(), dense_cpu_tensor.numel());
143
        ret = client_->Recv(
L
lilong12 已提交
144 145
            gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(),
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
146 147 148
        PADDLE_ENFORCE_EQ(ret, 0,
                          platform::errors::PreconditionNotMet(
                              "Recv from the switch module error."));
149

150
        switch (dense_cpu_tensor.dtype()) {
151
          case DataType::FLOAT32:
152
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
153
                           reinterpret_cast<float*>(cpu_tensor2.data()),
154
                           dense_cpu_tensor.numel());
155 156
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
157 158 159
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
160 161
            break;
          case DataType::INT32:
162
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
163
                         reinterpret_cast<int*>(cpu_tensor2.data()),
164
                         dense_cpu_tensor.numel());
165 166 167 168
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
169
                framework::DataType2String(dense_cpu_tensor.dtype())));
170 171
        }
      }
L
lilong12 已提交
172
    } else {
173
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
174 175 176 177
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
178 179 180 181
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
182 183 184 185 186
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
187 188
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
189
  broadcast_task->Wait();
190
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
191 192 193
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
194 195
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const BroadcastOptions& opts) {
L
lilong12 已提交
196 197
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
198
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
199
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
200 201 202
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
203 204 205 206
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
207 208
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
209 210

  if (local_rank_ == 0) {
211 212 213 214
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
215
      phi::DenseTensor cpu_tensor;
216 217
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
218
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
219 220
    }
    if (with_switch_) {
221 222 223
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
224
        auto dense_cpu_tensor = cpu_tensors[0];
225
        if (gloo_rank_ == 0) {
226
          std::vector<int64_t> send_size;
227
          send_size.push_back(dense_cpu_tensor.numel());
228
          int ret = client_->Send(
229 230 231 232
              gid_, {dense_cpu_tensor.name()}, send_size,
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
233 234 235
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Send to the switch module error."));
236 237
        } else {
          int ret = client_->Recv(
238 239 240
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
241 242 243 244 245
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
246
    } else {
247
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
248 249
      gloo_task->Wait();
    }
250 251 252 253
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
254 255
    }
  }
256
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
257
  broadcast_task->Wait();
258
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
259 260
}

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
    std::vector<phi::DenseTensor>& in_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
      in_tensors.size(), 1,
      platform::errors::PreconditionNotMet(
          "For each send operation, there can only be one tensor to send."));
  // Copy Tensor to cpu
  auto start = std::chrono::high_resolution_clock::now();
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = in_tensors[0];
  framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
  PADDLE_ENFORCE_EQ(with_switch_, true,
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
          << ") from gpu to cpu for send " << std::setw(9)
          << " is: " << diff.count() << " s" << std::endl;

  // Send to switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  int64_t tensor_size =
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
  std::vector<int64_t> send_size;
  send_size.push_back(tensor_size);
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(send_count++);
  VLOG(2) << "tensor_name:" << tensor_name;
  int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(),
                          tensor_size);
294 295 296
  PADDLE_ENFORCE_EQ(
      ret, 0,
      platform::errors::PreconditionNotMet("Send to the switch module error."));
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
  return CreateTask(rank_, CommType::SEND, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
    std::vector<phi::DenseTensor>& out_tensors, int peer) {
  PADDLE_ENFORCE_EQ(
      out_tensors.size(), 1,
      platform::errors::PreconditionNotMet(
          "For each rece operation, there can only be one tensor to receive."));

  // Copy Tensor to cpu
  phi::DenseTensor cpu_tensor;
  auto& gpu_tensor = out_tensors[0];
  cpu_tensor.Resize(gpu_tensor.dims());
  cpu_tensor.set_layout(gpu_tensor.layout());
  cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());

  PADDLE_ENFORCE_EQ(with_switch_, true,
                    platform::errors::PreconditionNotMet(
                        "Gloo does not support the send operation."));
  // recv from switch
  HeterClient* client_ =
      HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
  auto id = src_rank_ * 10000 + dst_rank_;
  std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
                            std::string("_") + std::to_string(recv_count++);
  VLOG(2) << "tensor_name: " << tensor_name;
  auto start = std::chrono::high_resolution_clock::now();
  int ret = client_->Recv(
      gid_, {tensor_name}, cpu_tensor.data(),
      cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
328 329 330
  PADDLE_ENFORCE_EQ(ret, 0,
                    platform::errors::PreconditionNotMet(
                        "receive to the switch module error."));
331 332 333 334 335 336 337 338 339 340
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = end - start;
  double goodput = cpu_tensor.numel() *
                   framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
  VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
  start = std::chrono::high_resolution_clock::now();
  framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
  end = std::chrono::high_resolution_clock::now();
  diff = end - start;
  VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
341
          << ") from cpu to gpu for recv " << std::setw(9)
342 343 344 345
          << " is: " << diff.count() << " s" << std::endl;
  return CreateTask(rank_, CommType::RECV, out_tensors);
}

346 347
}  // namespace distributed
}  // namespace paddle