ProcessGroupHeter.cc 10.0 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
29
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
L
lilong12 已提交
30 31 32 33
  return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
                                                        inputs);
}

34 35
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
                                     int rank, int size, int gid,
                                     int local_rank, int local_size,
                                     int gloo_rank, int gloo_size,
                                     bool with_switch,
                                     std::string switch_endpoint)
    : ProcessGroup(rank, size, gid),
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
59 60
      with_switch_(with_switch),
      switch_endpoint_(switch_endpoint) {
L
lilong12 已提交
61 62 63 64 65 66 67
#if defined(PADDLE_WITH_NCCL)
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
                                                 IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
                                                 IGNORE_ID);
#else
68
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
69 70
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
71
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
72 73 74 75 76 77 78
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
    inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
                                                   gloo_size_, IGNORE_ID, opts);
  }
}

79 80 81 82 83 84 85 86 87
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
88
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
89 90
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const AllreduceOptions& opts) {
L
lilong12 已提交
91 92
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
93
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
94
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
95 96 97
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
98 99 100
#endif

  // Step1: do allreduce in inner cluster
101
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
102 103 104 105
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
106 107
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
108
    phi::DenseTensor cpu_tensor;
109 110 111 112
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
113
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
114 115 116
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
117 118 119
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
120
        auto dense_cpu_tensor = cpu_tensors[0];
121
        std::vector<int> send_size;
122
        send_size.push_back(dense_cpu_tensor.numel());
123
        int ret = client_->Send(
124 125 126
            gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
127 128
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Send to the switch module error."));
L
lilong12 已提交
129 130 131 132 133 134
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
            dense_cpu_tensor.dtype(), dense_cpu_tensor.numel());
135
        ret = client_->Recv(
L
lilong12 已提交
136 137
            gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(),
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
138 139 140
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Recv from the switch module error."));

141
        switch (dense_cpu_tensor.dtype()) {
142
          case DataType::FLOAT32:
143
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
144
                           reinterpret_cast<float*>(cpu_tensor2.data()),
145
                           dense_cpu_tensor.numel());
146 147
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
148 149 150
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
151 152
            break;
          case DataType::INT32:
153
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
154
                         reinterpret_cast<int*>(cpu_tensor2.data()),
155
                         dense_cpu_tensor.numel());
156 157 158 159
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
160
                framework::DataType2String(dense_cpu_tensor.dtype())));
161 162
        }
      }
L
lilong12 已提交
163
    } else {
164
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
165 166 167 168
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
169 170 171 172
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
173 174 175 176 177
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
178 179
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
180
  broadcast_task->Wait();
181
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
182 183 184
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
185 186
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const BroadcastOptions& opts) {
L
lilong12 已提交
187 188
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
189
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
190
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
191 192 193
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
194 195 196 197
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
198 199
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
200 201

  if (local_rank_ == 0) {
202 203 204 205
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
206
      phi::DenseTensor cpu_tensor;
207 208
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
209
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
210 211
    }
    if (with_switch_) {
212 213 214
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
215
        auto dense_cpu_tensor = cpu_tensors[0];
216 217
        if (gloo_rank_ == 0) {
          std::vector<int> send_size;
218
          send_size.push_back(dense_cpu_tensor.numel());
219
          int ret = client_->Send(
220 221 222 223
              gid_, {dense_cpu_tensor.name()}, send_size,
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
224 225 226 227
          PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                        "Send to the switch module error."));
        } else {
          int ret = client_->Recv(
228 229 230
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
231 232 233 234 235
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
236
    } else {
237
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
238 239
      gloo_task->Wait();
    }
240 241 242 243
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
244 245
    }
  }
246
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
247
  broadcast_task->Wait();
248
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
249 250
}

251 252
}  // namespace distributed
}  // namespace paddle