ProcessGroupGloo.cc 17.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>

#ifdef _WIN32
#include <gloo/common/win.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#endif

#include <gloo/broadcast.h>
28 29
#include <gloo/reduce.h>
#include <gloo/scatter.h>
30

31
#include "paddle/fluid/distributed/collective/Common.h"
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace distributed {

#ifdef _WIN32
#define GENERATE_FUNC(type, func, ...)       \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(__VA_ARGS__);              \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(__VA_ARGS__);             \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(__VA_ARGS__);      \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(__VA_ARGS__);            \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(__VA_ARGS__);            \
      break;                                 \
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }

#define HOST_NAME_MAX 256

#else
#define GENERATE_FUNC(type, func, args...)   \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(args);                     \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(args);                    \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(args);             \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(args);                   \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(args);                   \
      break;                                 \
82 83 84 85 86 87 88 89 90
    case experimental::DataType::INT8:       \
      func<int8_t>(args);                    \
      break;                                 \
    case experimental::DataType::UINT8:      \
      func<uint8_t>(args);                   \
      break;                                 \
    case experimental::DataType::BOOL:       \
      func<bool>(args);                      \
      break;                                 \
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }
#endif

typedef void (*reduce_func)(void*, const void*, const void*, size_t);

template <typename T>
reduce_func get_function(const ReduceOp& r) {
  switch (r) {
    case ReduceOp::SUM:
      return reduce_func(&::gloo::sum<T>);
    case ReduceOp::PRODUCT:
      return reduce_func(&::gloo::product<T>);
    case ReduceOp::MIN:
      return reduce_func(&::gloo::min<T>);
    case ReduceOp::MAX:
      return reduce_func(&::gloo::max<T>);
    case ReduceOp::AVG:
      VLOG(0) << "Error: Unsupported ReduceOp::AVG.";
      exit(-1);
  }

  VLOG(0) << "Error: Unknown ReduceOp.";
  exit(-1);
}

template <typename T>
120 121
T* get_data(phi::DenseTensor& tensor) {  // NOLINT
  return reinterpret_cast<T*>(tensor.data());
122 123 124
}

template <typename T>
125 126 127 128
std::vector<T*> get_multi_data(
    std::vector<phi::DenseTensor>& tensors) {  // NOLINT
  std::vector<T*> ret;
  ret.reserve(tensors.size());
129
  for (size_t i = 0; i < tensors.size(); i++) {
130
    ret.push_back(get_data<T>(tensors[i]));
131 132 133 134 135
  }
  return ret;
}

template <typename T, typename P>
136
void set_output(P& opts, phi::DenseTensor& tensor) {  // NOLINT
137 138 139 140
  opts.setOutput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
141
void set_input(P& opts, phi::DenseTensor& tensor) {  // NOLINT
142 143 144 145
  opts.setInput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
146 147
void set_outputs(P& opts,                                   // NOLINT
                 std::vector<phi::DenseTensor>& tensors) {  // NOLINT
148 149 150 151
  opts.setOutputs(get_multi_data<T>(tensors), tensors[0].numel());
}

template <typename T, typename P>
152 153
void set_inputs(P& opts,                                   // NOLINT
                std::vector<phi::DenseTensor>& tensors) {  // NOLINT
154 155 156
  opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}

157
template <typename T, typename P>
158 159
void set_inputs_for_scatter(P& opts,                   // NOLINT
                            phi::DenseTensor& tensor,  // NOLINT
160
                            int nranks) {
161 162 163
  std::vector<T*> ret;
  ret.reserve(nranks);
  T* raw_pointer = reinterpret_cast<T*>(tensor.data());
164 165
  size_t offset = 0;
  for (int i = 0; i < nranks; i++) {
166 167
    ret.push_back(raw_pointer + offset);
    offset += tensor.numel() / nranks;
168
  }
169
  opts.setInputs(ret, tensor.numel() / nranks);
170 171
}

172 173 174
ProcessGroupGloo::GlooTask::GlooTask(
    int rank, const std::vector<phi::DenseTensor>& inputs, CommType comm_type)
    : ProcessGroup::Task(rank, inputs, comm_type) {}
175

176
ProcessGroupGloo::ProcessGroupGloo(
177 178 179 180 181
    const std::shared_ptr<distributed::Store>& store,
    int rank,
    int world_size,
    const platform::Place& place,
    int gid,
182 183
    const std::shared_ptr<GlooOptions> options)
    : ProcessGroup(rank, world_size, place, gid),
L
lilong12 已提交
184 185
      _tag(0),
      _store(new GlooStore(store)) {
186 187
  _context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
  auto prefix_store =
188
      ::gloo::rendezvous::PrefixStore(std::to_string(gid), *_store);
189 190 191 192 193 194
  _context->connectFullMesh(prefix_store, options->device);
}

class BroadcastGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BroadcastGlooTask(const std::shared_ptr<gloo::Context>& context,
195 196
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
197 198 199
                    int rank,
                    int root,
                    uint32_t tag)
200 201 202 203
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::BROADCAST),
        _context(context),
        _root(root),
        _inputs(inputs),
204
        _outputs(outputs),
205 206
        _tag(tag) {}

207
  void Run() override { _do_broadcast(_inputs[0], _outputs[0]); }
208 209 210 211

 private:
  std::shared_ptr<gloo::Context> _context;
  const int _root;
212 213
  std::vector<phi::DenseTensor> _inputs{};
  std::vector<phi::DenseTensor> _outputs{};
214 215
  const uint32_t _tag;

216
  void _do_broadcast(phi::DenseTensor& in, phi::DenseTensor& out) {  // NOLINT
217
    gloo::BroadcastOptions opts(_context);
218 219 220 221 222
    const auto& dtype = in.dtype();
    if (rank_ == _root) {
      GENERATE_FUNC(dtype, set_input, opts, in);
    }
    GENERATE_FUNC(dtype, set_output, opts, out);
223 224 225 226 227 228 229
    opts.setRoot(_root);
    opts.setTag(_tag);
    gloo::broadcast(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
230
    std::vector<phi::DenseTensor>& inputs,
231 232
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts) {
233 234 235 236
  auto root = opts.source_rank;
  std::unique_ptr<BroadcastGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
237 238
  task = std::make_unique<BroadcastGlooTask>(
      context, inputs, outputs, rank_, root, tag);
239 240 241 242 243 244
  task->Run();
  return task;
}

class AllreduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
245 246
  AllreduceGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
247 248
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
249 250
                    ReduceOp reduce_op,
                    uint32_t tag)
251 252 253
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE),
        _context(context),
        _inputs(inputs),
254
        _outputs(outputs),
255 256 257
        _reduce_op(reduce_op),
        _tag(tag) {}

258
  void Run() override { _do_allreduce(_inputs, _outputs); }
259 260 261

 private:
  std::shared_ptr<gloo::Context> _context;
262 263
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
  const ReduceOp _reduce_op;
  uint32_t _tag;

  gloo::AllreduceOptions::Func _get_function(const experimental::DataType type,
                                             const ReduceOp op) {
    gloo::AllreduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::AllreduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

280 281 282
  void _do_allreduce(std::vector<phi::DenseTensor>& ins,     // NOLINT
                     std::vector<phi::DenseTensor>& outs) {  // NOLINT
    const auto& dtype = ins[0].dtype();
283
    gloo::AllreduceOptions opts(_context);
284 285
    GENERATE_FUNC(dtype, set_inputs, opts, ins);
    GENERATE_FUNC(dtype, set_outputs, opts, outs);
286 287 288 289 290 291 292
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    gloo::allreduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
293
    std::vector<phi::DenseTensor>& inputs,
294 295
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts) {
296 297 298
  auto tag = next_tag();
  std::shared_ptr<GlooTask> task;
  auto context = get_context();
299 300
  task = std::make_shared<AllreduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, tag);
301 302 303 304
  task->Run();
  return task;
}

305 306 307
class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
308 309
      : ProcessGroupGloo::GlooTask(
            rank, std::vector<phi::DenseTensor>{}, CommType::BARRIER),
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
        _context(context) {}

  void Run() override { _do_barrier(); }

 private:
  std::shared_ptr<gloo::Context> _context;

  void _do_barrier() {
    gloo::BarrierOptions opts(_context);
    gloo::barrier(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
    const BarrierOptions& opts) {
  std::shared_ptr<BarrierGlooTask> task;
  auto context = get_context();
  task = std::make_shared<BarrierGlooTask>(rank_, context);
  task->Run();
  return task;
}

class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
 public:
334 335
  AllgatherGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
336 337
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
338 339 340 341 342 343 344 345 346 347 348
                    uint32_t tag)
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _tag(tag) {}

  void Run() override { _do_allgather(_inputs, _outputs); }

 private:
  std::shared_ptr<gloo::Context> _context;
349 350
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
351 352
  uint32_t _tag;

353 354 355
  void _do_allgather(std::vector<phi::DenseTensor>& in,     // NOLINT
                     std::vector<phi::DenseTensor>& out) {  // NOLINT
    const auto& dtype = in[0].dtype();
356 357 358 359 360 361 362 363 364
    gloo::AllgatherOptions opts(_context);
    GENERATE_FUNC(dtype, set_input, opts, in[0]);
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setTag(_tag);
    gloo::allgather(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
365 366
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
367 368 369
  std::shared_ptr<AllgatherGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
370 371
  task = std::make_shared<AllgatherGlooTask>(
      rank_, context, in_tensors, out_tensors, tag);
372 373 374 375 376 377
  task->Run();
  return task;
}

class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
378 379
  ReduceGlooTask(int rank,
                 const std::shared_ptr<gloo::Context>& context,
380 381
                 std::vector<phi::DenseTensor>& inputs,   // NOLINT
                 std::vector<phi::DenseTensor>& outputs,  // NOLINT
382 383 384
                 ReduceOp reduce_op,
                 int dst,
                 uint32_t tag)
385
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::REDUCE),
386
        _context(context),
387 388
        _inputs(inputs),
        _outputs(outputs),
389 390 391 392
        _reduce_op(reduce_op),
        _dst(dst),
        _tag(tag) {}

393
  void Run() override { _do_reduce(_inputs, _outputs, _dst); }
394 395 396

 private:
  std::shared_ptr<gloo::Context> _context;
397 398
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
  const ReduceOp _reduce_op;
  int _dst;
  uint32_t _tag;

  gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
                                          const ReduceOp op) {
    gloo::ReduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::ReduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

416 417 418 419
  void _do_reduce(std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
                  int dst) {
    const auto& dtype = inputs[0].dtype();
420
    gloo::ReduceOptions opts(_context);
421 422
    GENERATE_FUNC(dtype, set_input, opts, inputs[0]);
    GENERATE_FUNC(dtype, set_output, opts, outputs[0]);
423 424 425 426 427 428 429 430
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    opts.setRoot(dst);
    gloo::reduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
431
    std::vector<phi::DenseTensor>& inputs,
432 433
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts) {
434 435 436
  std::shared_ptr<ReduceGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
437 438
  task = std::make_shared<ReduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, opts.root_rank, tag);
439 440 441 442 443 444
  task->Run();
  return task;
}

class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
 public:
445 446
  ScatterGlooTask(int rank,
                  const std::shared_ptr<gloo::Context>& context,
447 448
                  std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
449 450 451
                  int src,
                  int size,
                  uint32_t tag)
452 453 454 455 456 457 458 459 460 461 462 463
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _src(src),
        _size(size),
        _tag(tag) {}

  void Run() override { _do_scatter(_inputs, _outputs, _src); }

 private:
  std::shared_ptr<gloo::Context> _context;
464 465
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
466 467 468 469
  int _src;
  int _size;
  uint32_t _tag;

470 471
  void _do_scatter(std::vector<phi::DenseTensor>& in,   // NOLINT
                   std::vector<phi::DenseTensor>& out,  // NOLINT
472
                   int src) {
473
    const auto& dtype = in[0].dtype();
474 475
    gloo::ScatterOptions opts(_context);
    if (rank_ == src) {
476
      GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in[0], _size);
477 478 479 480 481 482 483 484 485
    }
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setRoot(src);
    opts.setTag(_tag);
    gloo::scatter(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
486
    std::vector<phi::DenseTensor>& in_tensors,
487 488
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts) {
489 490 491 492 493 494 495 496 497
  std::shared_ptr<ScatterGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
  task = std::make_shared<ScatterGlooTask>(
      rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
  task->Run();
  return task;
}

498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
  ::gloo::transport::tcp::attr attr;
  attr.iface = ifname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForHostname(const std::string& hostname) {
  ::gloo::transport::tcp::attr attr;
  attr.hostname = hostname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDefaultDevice() {
  std::array<char, HOST_NAME_MAX> hostname{};
  auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX);
516
  PADDLE_ENFORCE_EQ(
517 518
      ret,
      0,
519
      platform::errors::Fatal("Get hostname error for createDefaultDevice."));
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
  ::addrinfo* result;
  result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC);
  ::addrinfo* cur;
  for (cur = result; cur != nullptr; cur = cur->ai_next) {
    SocketType socket =
        ::socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol);
    if (socket == -1) {
      continue;
    }
    ret = ::bind(socket, cur->ai_addr, cur->ai_addrlen);
#ifdef _WIN32
    closesocket(socket);
#else
    close(socket);
#endif
    if (ret == -1) {
      continue;
    }
    break;
  }
  freeaddrinfo(result);
  if (cur != nullptr) {
    return createDeviceForHostname(hostname.data());
  }
  return createDeviceForHostname("127.0.0.1");
}

}  // namespace distributed
}  // namespace paddle