ProcessGroupHeter.cc 9.9 KB
Newer Older
L
lilong12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
#include "paddle/phi/common/place.h"

constexpr int64_t kWaitBlockTImeout = 10;

namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
29
    int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
L
lilong12 已提交
30 31 32 33
  return std::make_shared<ProcessGroupHeter::HeterTask>(rank, comm_type,
                                                        inputs);
}

34 35
ProcessGroupHeter::HeterTask::HeterTask(
    int rank, CommType CommType, const std::vector<phi::DenseTensor>& inputs)
L
lilong12 已提交
36 37 38 39 40 41 42 43 44 45 46
    : Task(rank, inputs, CommType) {}

ProcessGroupHeter::HeterTask::~HeterTask() {}

bool ProcessGroupHeter::HeterTask::IsCompleted() { return true; }

// TODO(sheniang03): Add timeout for wait, now timeout unused
bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
  return true;
}

47 48 49 50 51
ProcessGroupHeter::ProcessGroupHeter(
    const std::shared_ptr<Store>& store, int rank, int size,
    const platform::Place& place, int gid, int local_rank, int local_size,
    int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint)
    : ProcessGroup(rank, size, place, gid),
L
lilong12 已提交
52 53 54 55 56
      store_(store),
      local_rank_(local_rank),
      local_size_(local_size),
      gloo_rank_(gloo_rank),
      gloo_size_(gloo_size),
57 58
      with_switch_(with_switch),
      switch_endpoint_(switch_endpoint) {
L
lilong12 已提交
59 60
#if defined(PADDLE_WITH_NCCL)
  inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
61
                                                 place_, IGNORE_ID);
L
lilong12 已提交
62 63
#elif defined(PADDLE_WITH_ASCEND_CL)
  inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
64
                                                 place_, IGNORE_ID);
L
lilong12 已提交
65
#else
66
  PADDLE_THROW(platform::errors::Fatal(
L
lilong12 已提交
67 68
      "ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
69
  if (local_rank_ == 0 && !with_switch_) {
L
lilong12 已提交
70 71
    auto opts = ProcessGroupGloo::GlooOptions::create();
    opts->device = ProcessGroupGloo::createDefaultDevice();
72 73
    inter_pg_ = std::make_shared<ProcessGroupGloo>(
        store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
L
lilong12 已提交
74 75 76
  }
}

77 78 79 80 81 82 83 84 85
template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
  for (size_t i = 0; i < size; i++) {
    *dst += *src;
    dst++;
    src++;
  }
}

L
lilong12 已提交
86
std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
87 88
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const AllreduceOptions& opts) {
L
lilong12 已提交
89 90
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
91
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
92
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
93 94 95
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
96 97 98
#endif

  // Step1: do allreduce in inner cluster
99
  auto task = inner_pg_->AllReduce(in_tensors, in_tensors, opts);
L
lilong12 已提交
100 101 102 103
  task->Wait();

  // Step2: copy tensors to CPU
  if (local_rank_ == 0) {
104 105
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
L
lilong12 已提交
106
    phi::DenseTensor cpu_tensor;
107 108 109 110
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
111
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
112 113 114
    }
    // Step3: do inter cluster allreduce
    if (with_switch_) {
115 116 117
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
118
        auto dense_cpu_tensor = cpu_tensors[0];
119
        std::vector<int64_t> send_size;
120
        send_size.push_back(dense_cpu_tensor.numel());
121
        int ret = client_->Send(
122 123 124
            gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
            dense_cpu_tensor.numel() *
                framework::DataTypeSize(dense_cpu_tensor.dtype()));
125 126
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Send to the switch module error."));
L
lilong12 已提交
127 128 129 130 131 132
        phi::DenseTensor cpu_tensor2;
        cpu_tensor2.AllocateFrom(
            std::make_unique<paddle::experimental::DefaultAllocator>(
                paddle::platform::CPUPlace())
                .get(),
            dense_cpu_tensor.dtype(), dense_cpu_tensor.numel());
133
        ret = client_->Recv(
L
lilong12 已提交
134 135
            gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(),
            cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
136 137 138
        PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                      "Recv from the switch module error."));

139
        switch (dense_cpu_tensor.dtype()) {
140
          case DataType::FLOAT32:
141
            _do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor.data()),
L
lilong12 已提交
142
                           reinterpret_cast<float*>(cpu_tensor2.data()),
143
                           dense_cpu_tensor.numel());
144 145
            break;
          case DataType::FLOAT64:
L
lilong12 已提交
146 147 148
            _do_add<double>(reinterpret_cast<double*>(dense_cpu_tensor.data()),
                            reinterpret_cast<double*>(cpu_tensor2.data()),
                            dense_cpu_tensor.numel());
149 150
            break;
          case DataType::INT32:
151
            _do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor.data()),
L
lilong12 已提交
152
                         reinterpret_cast<int*>(cpu_tensor2.data()),
153
                         dense_cpu_tensor.numel());
154 155 156 157
            break;
          default:
            PADDLE_THROW(platform::errors::PreconditionNotMet(
                "Unsupported data type (%s) to do add.",
158
                framework::DataType2String(dense_cpu_tensor.dtype())));
159 160
        }
      }
L
lilong12 已提交
161
    } else {
162
      auto gloo_task = inter_pg_->AllReduce(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
163 164 165 166
      gloo_task->Wait();
    }
    // Step4: copy cpu tensors to gpu
    // copy cpu tensors to gpu
167 168 169 170
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, cpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
171 172 173 174 175
    }
  }

  // Step5: broadcast among inner cluster
  auto b_opts = BroadcastOptions();
176 177
  b_opts.source_rank = 0;
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
178
  broadcast_task->Wait();
179
  return CreateTask(rank_, CommType::ALLREDUCE, in_tensors);
L
lilong12 已提交
180 181 182
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
183 184
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors, const BroadcastOptions& opts) {
L
lilong12 已提交
185 186
#if defined(PADDLE_WITH_NCCL)
  PADDLE_ENFORCE_EQ(
187
      CheckTensorsInCudaPlace(in_tensors), true,
L
lilong12 已提交
188
      platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
189 190 191
  PADDLE_ENFORCE_EQ(
      CheckTensorsInCudaPlace(out_tensors), true,
      platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
L
lilong12 已提交
192 193 194 195
#endif

  // Step1: do broadcast in inner cluster
  auto b_opts = BroadcastOptions();
196 197
  b_opts.source_rank = 0;
  inner_pg_->Broadcast(in_tensors, out_tensors, b_opts);
L
lilong12 已提交
198 199

  if (local_rank_ == 0) {
200 201 202 203
    std::vector<phi::DenseTensor> cpu_tensors;
    cpu_tensors.reserve(in_tensors.size());
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = in_tensors[i];
L
lilong12 已提交
204
      phi::DenseTensor cpu_tensor;
205 206
      cpu_tensor.Resize(gpu_tensor.dims());
      framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
L
lilong12 已提交
207
      cpu_tensors.push_back(cpu_tensor);
L
lilong12 已提交
208 209
    }
    if (with_switch_) {
210 211 212
      if (local_rank_ == 0) {
        HeterClient* client_ =
            HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
213
        auto dense_cpu_tensor = cpu_tensors[0];
214
        if (gloo_rank_ == 0) {
215
          std::vector<int64_t> send_size;
216
          send_size.push_back(dense_cpu_tensor.numel());
217
          int ret = client_->Send(
218 219 220 221
              gid_, {dense_cpu_tensor.name()}, send_size,
              dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
222 223 224 225
          PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
                                        "Send to the switch module error."));
        } else {
          int ret = client_->Recv(
226 227 228
              gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
              dense_cpu_tensor.numel() *
                  framework::DataTypeSize(dense_cpu_tensor.dtype()));
229 230 231 232 233
          PADDLE_ENFORCE_EQ(ret, 0,
                            platform::errors::PreconditionNotMet(
                                "Receive from the switch module error."));
        }
      }
L
lilong12 已提交
234
    } else {
235
      auto gloo_task = inter_pg_->Broadcast(cpu_tensors, cpu_tensors, opts);
L
lilong12 已提交
236 237
      gloo_task->Wait();
    }
238 239 240 241
    for (size_t i = 0; i < in_tensors.size(); i++) {
      auto gpu_tensor = out_tensors[i];
      auto cpu_tensor = cpu_tensors[i];
      framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
L
lilong12 已提交
242 243
    }
  }
244
  auto broadcast_task = inner_pg_->Broadcast(out_tensors, out_tensors, b_opts);
L
lilong12 已提交
245
  broadcast_task->Wait();
246
  return CreateTask(rank_, CommType::BROADCAST, in_tensors);
L
lilong12 已提交
247 248
}

249 250
}  // namespace distributed
}  // namespace paddle