ProcessGroupGloo.cc 18.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>

#ifdef _WIN32
#include <gloo/common/win.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#endif

#include <gloo/broadcast.h>
28 29
#include <gloo/reduce.h>
#include <gloo/scatter.h>
30

31
#include "paddle/fluid/distributed/collective/Common.h"
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace distributed {

#ifdef _WIN32
#define GENERATE_FUNC(type, func, ...)       \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(__VA_ARGS__);              \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(__VA_ARGS__);             \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(__VA_ARGS__);      \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(__VA_ARGS__);            \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(__VA_ARGS__);            \
      break;                                 \
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }

#define HOST_NAME_MAX 256

#else
#define GENERATE_FUNC(type, func, args...)   \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(args);                     \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(args);                    \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(args);             \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(args);                   \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(args);                   \
      break;                                 \
82 83 84 85 86 87 88 89 90
    case experimental::DataType::INT8:       \
      func<int8_t>(args);                    \
      break;                                 \
    case experimental::DataType::UINT8:      \
      func<uint8_t>(args);                   \
      break;                                 \
    case experimental::DataType::BOOL:       \
      func<bool>(args);                      \
      break;                                 \
91 92 93
    case experimental::DataType::BFLOAT16:   \
      func<bfloat16>(args);                  \
      break;                                 \
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }
#endif

typedef void (*reduce_func)(void*, const void*, const void*, size_t);

template <typename T>
reduce_func get_function(const ReduceOp& r) {
  switch (r) {
    case ReduceOp::SUM:
      return reduce_func(&::gloo::sum<T>);
    case ReduceOp::PRODUCT:
      return reduce_func(&::gloo::product<T>);
    case ReduceOp::MIN:
      return reduce_func(&::gloo::min<T>);
    case ReduceOp::MAX:
      return reduce_func(&::gloo::max<T>);
    case ReduceOp::AVG:
      VLOG(0) << "Error: Unsupported ReduceOp::AVG.";
      exit(-1);
  }

  VLOG(0) << "Error: Unknown ReduceOp.";
  exit(-1);
}

template <typename T>
123 124
T* get_data(phi::DenseTensor& tensor) {  // NOLINT
  return reinterpret_cast<T*>(tensor.data());
125 126 127
}

template <typename T>
128 129 130 131
std::vector<T*> get_multi_data(
    std::vector<phi::DenseTensor>& tensors) {  // NOLINT
  std::vector<T*> ret;
  ret.reserve(tensors.size());
132
  for (size_t i = 0; i < tensors.size(); i++) {
133
    ret.push_back(get_data<T>(tensors[i]));
134 135 136 137 138
  }
  return ret;
}

template <typename T, typename P>
139
void set_output(P& opts, phi::DenseTensor& tensor) {  // NOLINT
140 141 142 143
  opts.setOutput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
144
void set_input(P& opts, phi::DenseTensor& tensor) {  // NOLINT
145 146 147 148
  opts.setInput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
149 150
void set_outputs(P& opts,                                   // NOLINT
                 std::vector<phi::DenseTensor>& tensors) {  // NOLINT
151 152 153 154
  opts.setOutputs(get_multi_data<T>(tensors), tensors[0].numel());
}

template <typename T, typename P>
155 156
void set_inputs(P& opts,                                   // NOLINT
                std::vector<phi::DenseTensor>& tensors) {  // NOLINT
157 158 159
  opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}

160
template <typename T, typename P>
161 162
void set_inputs_for_scatter(P& opts,                   // NOLINT
                            phi::DenseTensor& tensor,  // NOLINT
163
                            int nranks) {
164 165 166
  std::vector<T*> ret;
  ret.reserve(nranks);
  T* raw_pointer = reinterpret_cast<T*>(tensor.data());
167 168
  size_t offset = 0;
  for (int i = 0; i < nranks; i++) {
169 170
    ret.push_back(raw_pointer + offset);
    offset += tensor.numel() / nranks;
171
  }
172
  opts.setInputs(ret, tensor.numel() / nranks);
173 174
}

175 176 177
ProcessGroupGloo::GlooTask::GlooTask(
    int rank, const std::vector<phi::DenseTensor>& inputs, CommType comm_type)
    : ProcessGroup::Task(rank, inputs, comm_type) {}
178

179
ProcessGroupGloo::ProcessGroupGloo(
180 181 182 183 184
    const std::shared_ptr<distributed::Store>& store,
    int rank,
    int world_size,
    const platform::Place& place,
    int gid,
185 186
    const std::shared_ptr<GlooOptions> options)
    : ProcessGroup(rank, world_size, place, gid),
L
lilong12 已提交
187 188
      _tag(0),
      _store(new GlooStore(store)) {
189 190
  _context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
  auto prefix_store =
191
      ::gloo::rendezvous::PrefixStore(std::to_string(gid), *_store);
192 193 194 195 196 197
  _context->connectFullMesh(prefix_store, options->device);
}

class BroadcastGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BroadcastGlooTask(const std::shared_ptr<gloo::Context>& context,
198 199
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
200 201 202
                    int rank,
                    int root,
                    uint32_t tag)
203 204 205 206
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::BROADCAST),
        _context(context),
        _root(root),
        _inputs(inputs),
207
        _outputs(outputs),
208 209
        _tag(tag) {}

210
  void Run() override { _do_broadcast(_inputs[0], _outputs[0]); }
211 212 213 214

 private:
  std::shared_ptr<gloo::Context> _context;
  const int _root;
215 216
  std::vector<phi::DenseTensor> _inputs{};
  std::vector<phi::DenseTensor> _outputs{};
217 218
  const uint32_t _tag;

219
  void _do_broadcast(phi::DenseTensor& in, phi::DenseTensor& out) {  // NOLINT
220
    gloo::BroadcastOptions opts(_context);
221 222 223 224 225
    const auto& dtype = in.dtype();
    if (rank_ == _root) {
      GENERATE_FUNC(dtype, set_input, opts, in);
    }
    GENERATE_FUNC(dtype, set_output, opts, out);
226 227 228 229 230 231 232
    opts.setRoot(_root);
    opts.setTag(_tag);
    gloo::broadcast(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
233
    std::vector<phi::DenseTensor>& inputs,
234 235
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts) {
236 237 238 239 240 241 242 243
  return Broadcast(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts,
    bool sync_op) {
244 245 246 247
  auto root = opts.source_rank;
  std::unique_ptr<BroadcastGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
248 249
  task = std::make_unique<BroadcastGlooTask>(
      context, inputs, outputs, rank_, root, tag);
250 251 252 253 254 255
  task->Run();
  return task;
}

class AllreduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
256 257
  AllreduceGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
258 259
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
260 261
                    ReduceOp reduce_op,
                    uint32_t tag)
262 263 264
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE),
        _context(context),
        _inputs(inputs),
265
        _outputs(outputs),
266 267 268
        _reduce_op(reduce_op),
        _tag(tag) {}

269
  void Run() override { _do_allreduce(_inputs, _outputs); }
270 271 272

 private:
  std::shared_ptr<gloo::Context> _context;
273 274
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
  const ReduceOp _reduce_op;
  uint32_t _tag;

  gloo::AllreduceOptions::Func _get_function(const experimental::DataType type,
                                             const ReduceOp op) {
    gloo::AllreduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::AllreduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

291 292 293
  void _do_allreduce(std::vector<phi::DenseTensor>& ins,     // NOLINT
                     std::vector<phi::DenseTensor>& outs) {  // NOLINT
    const auto& dtype = ins[0].dtype();
294
    gloo::AllreduceOptions opts(_context);
295 296
    GENERATE_FUNC(dtype, set_inputs, opts, ins);
    GENERATE_FUNC(dtype, set_outputs, opts, outs);
297 298 299 300 301 302 303
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    gloo::allreduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
304
    std::vector<phi::DenseTensor>& inputs,
305 306
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts) {
307 308 309 310 311 312 313 314
  return AllReduce(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts,
    bool sync_op) {
315 316 317
  auto tag = next_tag();
  std::shared_ptr<GlooTask> task;
  auto context = get_context();
318 319
  task = std::make_shared<AllreduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, tag);
320 321 322 323
  task->Run();
  return task;
}

324 325 326
class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
327 328
      : ProcessGroupGloo::GlooTask(
            rank, std::vector<phi::DenseTensor>{}, CommType::BARRIER),
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
        _context(context) {}

  void Run() override { _do_barrier(); }

 private:
  std::shared_ptr<gloo::Context> _context;

  void _do_barrier() {
    gloo::BarrierOptions opts(_context);
    gloo::barrier(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
    const BarrierOptions& opts) {
  std::shared_ptr<BarrierGlooTask> task;
  auto context = get_context();
  task = std::make_shared<BarrierGlooTask>(rank_, context);
  task->Run();
  return task;
}

class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
 public:
353 354
  AllgatherGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
355 356
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
357 358 359 360 361 362 363 364 365 366 367
                    uint32_t tag)
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _tag(tag) {}

  void Run() override { _do_allgather(_inputs, _outputs); }

 private:
  std::shared_ptr<gloo::Context> _context;
368 369
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
370 371
  uint32_t _tag;

372 373 374
  void _do_allgather(std::vector<phi::DenseTensor>& in,     // NOLINT
                     std::vector<phi::DenseTensor>& out) {  // NOLINT
    const auto& dtype = in[0].dtype();
375 376 377 378 379 380 381 382 383
    gloo::AllgatherOptions opts(_context);
    GENERATE_FUNC(dtype, set_input, opts, in[0]);
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setTag(_tag);
    gloo::allgather(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
384 385
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
386 387 388
  std::shared_ptr<AllgatherGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
389 390
  task = std::make_shared<AllgatherGlooTask>(
      rank_, context, in_tensors, out_tensors, tag);
391 392 393 394 395 396
  task->Run();
  return task;
}

class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
397 398
  ReduceGlooTask(int rank,
                 const std::shared_ptr<gloo::Context>& context,
399 400
                 std::vector<phi::DenseTensor>& inputs,   // NOLINT
                 std::vector<phi::DenseTensor>& outputs,  // NOLINT
401 402 403
                 ReduceOp reduce_op,
                 int dst,
                 uint32_t tag)
404
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::REDUCE),
405
        _context(context),
406 407
        _inputs(inputs),
        _outputs(outputs),
408 409 410 411
        _reduce_op(reduce_op),
        _dst(dst),
        _tag(tag) {}

412
  void Run() override { _do_reduce(_inputs, _outputs, _dst); }
413 414 415

 private:
  std::shared_ptr<gloo::Context> _context;
416 417
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  const ReduceOp _reduce_op;
  int _dst;
  uint32_t _tag;

  gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
                                          const ReduceOp op) {
    gloo::ReduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::ReduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

435 436 437 438
  void _do_reduce(std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
                  int dst) {
    const auto& dtype = inputs[0].dtype();
439
    gloo::ReduceOptions opts(_context);
440 441
    GENERATE_FUNC(dtype, set_input, opts, inputs[0]);
    GENERATE_FUNC(dtype, set_output, opts, outputs[0]);
442 443 444 445 446 447 448 449
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    opts.setRoot(dst);
    gloo::reduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
450
    std::vector<phi::DenseTensor>& inputs,
451 452
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts) {
453 454 455 456 457 458 459 460
  return Reduce(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts,
    bool sync_op) {
461 462 463
  std::shared_ptr<ReduceGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
464 465
  task = std::make_shared<ReduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, opts.root_rank, tag);
466 467 468 469 470 471
  task->Run();
  return task;
}

class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
 public:
472 473
  ScatterGlooTask(int rank,
                  const std::shared_ptr<gloo::Context>& context,
474 475
                  std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
476 477 478
                  int src,
                  int size,
                  uint32_t tag)
479 480 481 482 483 484 485 486 487 488 489 490
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _src(src),
        _size(size),
        _tag(tag) {}

  void Run() override { _do_scatter(_inputs, _outputs, _src); }

 private:
  std::shared_ptr<gloo::Context> _context;
491 492
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
493 494 495 496
  int _src;
  int _size;
  uint32_t _tag;

497 498
  void _do_scatter(std::vector<phi::DenseTensor>& in,   // NOLINT
                   std::vector<phi::DenseTensor>& out,  // NOLINT
499
                   int src) {
500
    const auto& dtype = in[0].dtype();
501 502
    gloo::ScatterOptions opts(_context);
    if (rank_ == src) {
503
      GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in[0], _size);
504 505 506 507 508 509 510 511 512
    }
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setRoot(src);
    opts.setTag(_tag);
    gloo::scatter(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
513
    std::vector<phi::DenseTensor>& in_tensors,
514 515
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts) {
516 517 518 519 520 521 522 523
  return Scatter(in_tensors, out_tensors, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts,
    bool sync_op) {
524 525 526 527 528 529 530 531 532
  std::shared_ptr<ScatterGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
  task = std::make_shared<ScatterGlooTask>(
      rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
  task->Run();
  return task;
}

533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
  ::gloo::transport::tcp::attr attr;
  attr.iface = ifname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForHostname(const std::string& hostname) {
  ::gloo::transport::tcp::attr attr;
  attr.hostname = hostname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDefaultDevice() {
  std::array<char, HOST_NAME_MAX> hostname{};
  auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX);
551
  PADDLE_ENFORCE_EQ(
552 553
      ret,
      0,
554
      platform::errors::Fatal("Get hostname error for createDefaultDevice."));
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
  ::addrinfo* result;
  result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC);
  ::addrinfo* cur;
  for (cur = result; cur != nullptr; cur = cur->ai_next) {
    SocketType socket =
        ::socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol);
    if (socket == -1) {
      continue;
    }
    ret = ::bind(socket, cur->ai_addr, cur->ai_addrlen);
#ifdef _WIN32
    closesocket(socket);
#else
    close(socket);
#endif
    if (ret == -1) {
      continue;
    }
    break;
  }
  freeaddrinfo(result);
  if (cur != nullptr) {
    return createDeviceForHostname(hostname.data());
  }
  return createDeviceForHostname("127.0.0.1");
}

}  // namespace distributed
}  // namespace paddle