ProcessGroupHeter.cc 10.6 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
29
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
L
lilong12 已提交
30 31 32 33
  return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
                                                        inputs);
}

34 35
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
                                     int rank, int size, int gid,
                                     int local_rank, int local_size,
                                     int gloo_rank, int gloo_size,
                                     bool with_switch,
                                     std::string switch_endpoint)
    : ProcessGroup(rank, size, gid),
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
59 60
      with_switch_(with_switch),
      switch_endpoint_(switch_endpoint) {
L
lilong12 已提交
61 62 63 64 65 66 67
#if defined(PADDLE_WITH_NCCL)
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
                                                 IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
                                                 IGNORE_ID);
#else
68
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
69 70
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
71
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
72 73 74 75 76 77 78
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
    inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
                                                   gloo_size_, IGNORE_ID, opts);
  }
}

79 80 81 82 83 84 85 86 87
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
88
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
89 90
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const AllreduceOptions& opts) {
L
lilong12 已提交
91 92
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
93
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
94
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
95 96 97
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
98 99 100
#endif

  // Step1: do allreduce in inner cluster
101
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
102 103 104 105
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
106 107 108 109 110 111 112
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
113 114 115
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
116 117 118
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
119
        auto dense_cpu_tensor = cpu_tensors[0];
120
        std::vector<int> send_size;
121
        send_size.push_back(dense_cpu_tensor.numel());
122
        int ret = client_->Send(
123 124 125
            gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
126 127 128
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Send to the switch module error."));
        phi::DenseTensorMeta meta = phi::DenseTensorMeta(
129
            dense_cpu_tensor.dtype(), dense_cpu_tensor.dims());
130 131 132 133 134 135
        std::shared_ptr<phi::DenseTensor> dense_cpu_tensor2 =
            std::make_shared<phi::DenseTensor>(
                std::make_unique<paddle::experimental::DefaultAllocator>(
                    paddle::platform::CPUPlace())
                    .get(),
                meta);
136
        dense_cpu_tensor2->ResizeAndAllocate(dense_cpu_tensor.dims());
137
        ret = client_->Recv(
138
            gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor2->data(),
139 140 141 142 143
            dense_cpu_tensor2->numel() *
                framework::DataTypeSize(dense_cpu_tensor2->dtype()));
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Recv from the switch module error."));

144
        switch (dense_cpu_tensor.dtype()) {
145
          case DataType::FLOAT32:
146
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
147
                           reinterpret_cast<float*>(dense_cpu_tensor2->data()),
148
                           dense_cpu_tensor.numel());
149 150 151
            break;
          case DataType::FLOAT64:
            _do_add<double>(
152
                reinterpret_cast<double*>(dense_cpu_tensor.data()),
153
                reinterpret_cast<double*>(dense_cpu_tensor2->data()),
154
                dense_cpu_tensor.numel());
155 156
            break;
          case DataType::INT32:
157
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
158
                         reinterpret_cast<int*>(dense_cpu_tensor2->data()),
159
                         dense_cpu_tensor.numel());
160 161 162 163
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
164
                framework::DataType2String(dense_cpu_tensor.dtype())));
165 166
        }
      }
L
lilong12 已提交
167
    } else {
168
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
169 170 171 172
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
173 174 175 176
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
177 178 179 180 181
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
182 183
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
184
  broadcast_task->Wait();
185
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
186 187 188
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
189 190
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const BroadcastOptions& opts) {
L
lilong12 已提交
191 192
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
193
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
194
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
195 196 197
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
198 199 200 201
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
202 203
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
204 205

  if (local_rank_ == 0) {
206 207 208 209 210 211 212
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
213 214
    }
    if (with_switch_) {
215 216 217
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
218
        auto dense_cpu_tensor = cpu_tensors[0];
219 220
        if (gloo_rank_ == 0) {
          std::vector<int> send_size;
221
          send_size.push_back(dense_cpu_tensor.numel());
222
          int ret = client_->Send(
223 224 225 226
              gid_, {dense_cpu_tensor.name()}, send_size,
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
227 228 229 230
          PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                        "Send to the switch module error."));
        } else {
          int ret = client_->Recv(
231 232 233
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
234 235 236 237
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
          ret = client_->Recv(
238 239 240
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
241 242 243 244 245
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
246
    } else {
247
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
248 249
      gloo_task->Wait();
    }
250 251 252 253
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
254 255
    }
  }
256
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
257
  broadcast_task->Wait();
258
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
259 260
}

261 262
}  // namespace distributed
}  // namespace paddle