ProcessGroupGloo.cc 19.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>

#ifdef _WIN32
#include <gloo/common/win.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#endif

#include <gloo/broadcast.h>
28 29
#include <gloo/reduce.h>
#include <gloo/scatter.h>
30

31
#include "paddle/fluid/distributed/collective/Common.h"
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace distributed {

#ifdef _WIN32
#define GENERATE_FUNC(type, func, ...)       \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(__VA_ARGS__);              \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(__VA_ARGS__);             \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(__VA_ARGS__);      \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(__VA_ARGS__);            \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(__VA_ARGS__);            \
      break;                                 \
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }

#define HOST_NAME_MAX 256

#else
#define GENERATE_FUNC(type, func, args...)   \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(args);                     \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(args);                    \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(args);             \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(args);                   \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(args);                   \
      break;                                 \
82 83 84 85 86 87 88 89 90
    case experimental::DataType::INT8:       \
      func<int8_t>(args);                    \
      break;                                 \
    case experimental::DataType::UINT8:      \
      func<uint8_t>(args);                   \
      break;                                 \
    case experimental::DataType::BOOL:       \
      func<bool>(args);                      \
      break;                                 \
91 92 93
    case experimental::DataType::BFLOAT16:   \
      func<bfloat16>(args);                  \
      break;                                 \
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }
#endif

typedef void (*reduce_func)(void*, const void*, const void*, size_t);

template <typename T>
reduce_func get_function(const ReduceOp& r) {
  switch (r) {
    case ReduceOp::SUM:
      return reduce_func(&::gloo::sum<T>);
    case ReduceOp::PRODUCT:
      return reduce_func(&::gloo::product<T>);
    case ReduceOp::MIN:
      return reduce_func(&::gloo::min<T>);
    case ReduceOp::MAX:
      return reduce_func(&::gloo::max<T>);
    case ReduceOp::AVG:
      VLOG(0) << "Error: Unsupported ReduceOp::AVG.";
      exit(-1);
  }

  VLOG(0) << "Error: Unknown ReduceOp.";
  exit(-1);
}

template <typename T>
123 124
T* get_data(phi::DenseTensor& tensor) {  // NOLINT
  return reinterpret_cast<T*>(tensor.data());
125 126 127
}

template <typename T>
128 129 130 131
std::vector<T*> get_multi_data(
    std::vector<phi::DenseTensor>& tensors) {  // NOLINT
  std::vector<T*> ret;
  ret.reserve(tensors.size());
132
  for (size_t i = 0; i < tensors.size(); i++) {
133
    ret.push_back(get_data<T>(tensors[i]));
134 135 136 137 138
  }
  return ret;
}

template <typename T, typename P>
139
void set_output(P& opts, phi::DenseTensor& tensor) {  // NOLINT
140 141 142 143
  opts.setOutput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
144
void set_input(P& opts, phi::DenseTensor& tensor) {  // NOLINT
145 146 147 148
  opts.setInput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
149 150
void set_outputs(P& opts,                                   // NOLINT
                 std::vector<phi::DenseTensor>& tensors) {  // NOLINT
151 152 153 154
  opts.setOutputs(get_multi_data<T>(tensors), tensors[0].numel());
}

template <typename T, typename P>
155 156
void set_inputs(P& opts,                                   // NOLINT
                std::vector<phi::DenseTensor>& tensors) {  // NOLINT
157 158 159
  opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}

160
template <typename T, typename P>
161 162
void set_inputs_for_scatter(P& opts,                   // NOLINT
                            phi::DenseTensor& tensor,  // NOLINT
163
                            int nranks) {
164 165 166
  std::vector<T*> ret;
  ret.reserve(nranks);
  T* raw_pointer = reinterpret_cast<T*>(tensor.data());
167 168
  size_t offset = 0;
  for (int i = 0; i < nranks; i++) {
169 170
    ret.push_back(raw_pointer + offset);
    offset += tensor.numel() / nranks;
171
  }
172
  opts.setInputs(ret, tensor.numel() / nranks);
173 174
}

175 176 177
ProcessGroupGloo::GlooTask::GlooTask(
    int rank, const std::vector<phi::DenseTensor>& inputs, CommType comm_type)
    : ProcessGroup::Task(rank, inputs, comm_type) {}
178

179
ProcessGroupGloo::ProcessGroupGloo(
180 181 182 183 184
    const std::shared_ptr<distributed::Store>& store,
    int rank,
    int world_size,
    const platform::Place& place,
    int gid,
185 186
    const std::shared_ptr<GlooOptions> options)
    : ProcessGroup(rank, world_size, place, gid),
L
lilong12 已提交
187 188
      _tag(0),
      _store(new GlooStore(store)) {
189 190
  _context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
  auto prefix_store =
191
      ::gloo::rendezvous::PrefixStore(std::to_string(gid), *_store);
192 193 194 195 196 197
  _context->connectFullMesh(prefix_store, options->device);
}

class BroadcastGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BroadcastGlooTask(const std::shared_ptr<gloo::Context>& context,
198 199
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
200 201 202
                    int rank,
                    int root,
                    uint32_t tag)
203 204 205 206
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::BROADCAST),
        _context(context),
        _root(root),
        _inputs(inputs),
207
        _outputs(outputs),
208 209
        _tag(tag) {}

210
  void Run() override { _do_broadcast(_inputs[0], _outputs[0]); }
211 212 213 214

 private:
  std::shared_ptr<gloo::Context> _context;
  const int _root;
215 216
  std::vector<phi::DenseTensor> _inputs{};
  std::vector<phi::DenseTensor> _outputs{};
217 218
  const uint32_t _tag;

219
  void _do_broadcast(phi::DenseTensor& in, phi::DenseTensor& out) {  // NOLINT
220
    gloo::BroadcastOptions opts(_context);
221 222 223 224 225
    const auto& dtype = in.dtype();
    if (rank_ == _root) {
      GENERATE_FUNC(dtype, set_input, opts, in);
    }
    GENERATE_FUNC(dtype, set_output, opts, out);
226 227 228 229 230 231
    opts.setRoot(_root);
    opts.setTag(_tag);
    gloo::broadcast(opts);
  }
};

232 233 234 235 236 237 238 239 240 241 242
// TODO(sunyilun): for compatibility, will be updated later
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
    phi::DenseTensor* out_tensor,
    const phi::DenseTensor& in_tensor,
    const BroadcastOptions& opts,
    bool sync_op) {
  std::vector<phi::DenseTensor> in_wrapper = {in_tensor};
  std::vector<phi::DenseTensor> out_wrapper = {*out_tensor};
  return Broadcast(in_wrapper, out_wrapper, opts, true);
}

243
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
244
    std::vector<phi::DenseTensor>& inputs,
245 246
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts) {
247 248 249 250 251 252 253 254
  return Broadcast(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts,
    bool sync_op) {
255 256 257 258
  auto root = opts.source_rank;
  std::unique_ptr<BroadcastGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
259 260
  task = std::make_unique<BroadcastGlooTask>(
      context, inputs, outputs, rank_, root, tag);
261 262 263 264 265 266
  task->Run();
  return task;
}

class AllreduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
267 268
  AllreduceGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
269 270
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
271 272
                    ReduceOp reduce_op,
                    uint32_t tag)
273 274 275
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE),
        _context(context),
        _inputs(inputs),
276
        _outputs(outputs),
277 278 279
        _reduce_op(reduce_op),
        _tag(tag) {}

280
  void Run() override { _do_allreduce(_inputs, _outputs); }
281 282 283

 private:
  std::shared_ptr<gloo::Context> _context;
284 285
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
  const ReduceOp _reduce_op;
  uint32_t _tag;

  gloo::AllreduceOptions::Func _get_function(const experimental::DataType type,
                                             const ReduceOp op) {
    gloo::AllreduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::AllreduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

302 303 304
  void _do_allreduce(std::vector<phi::DenseTensor>& ins,     // NOLINT
                     std::vector<phi::DenseTensor>& outs) {  // NOLINT
    const auto& dtype = ins[0].dtype();
305
    gloo::AllreduceOptions opts(_context);
306 307
    GENERATE_FUNC(dtype, set_inputs, opts, ins);
    GENERATE_FUNC(dtype, set_outputs, opts, outs);
308 309 310 311 312 313 314
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    gloo::allreduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
315
    std::vector<phi::DenseTensor>& inputs,
316 317
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts) {
318 319 320 321 322 323 324 325
  return AllReduce(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts,
    bool sync_op) {
326 327 328
  auto tag = next_tag();
  std::shared_ptr<GlooTask> task;
  auto context = get_context();
329 330
  task = std::make_shared<AllreduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, tag);
331 332 333 334
  task->Run();
  return task;
}

335 336 337
class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
338 339
      : ProcessGroupGloo::GlooTask(
            rank, std::vector<phi::DenseTensor>{}, CommType::BARRIER),
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
        _context(context) {}

  void Run() override { _do_barrier(); }

 private:
  std::shared_ptr<gloo::Context> _context;

  void _do_barrier() {
    gloo::BarrierOptions opts(_context);
    gloo::barrier(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
    const BarrierOptions& opts) {
  std::shared_ptr<BarrierGlooTask> task;
  auto context = get_context();
  task = std::make_shared<BarrierGlooTask>(rank_, context);
  task->Run();
  return task;
}

class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
 public:
364 365
  AllgatherGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
366 367
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
368 369 370 371 372 373 374 375 376 377 378
                    uint32_t tag)
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _tag(tag) {}

  void Run() override { _do_allgather(_inputs, _outputs); }

 private:
  std::shared_ptr<gloo::Context> _context;
379 380
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
381 382
  uint32_t _tag;

383 384 385
  void _do_allgather(std::vector<phi::DenseTensor>& in,     // NOLINT
                     std::vector<phi::DenseTensor>& out) {  // NOLINT
    const auto& dtype = in[0].dtype();
386 387 388 389 390 391 392 393 394
    gloo::AllgatherOptions opts(_context);
    GENERATE_FUNC(dtype, set_input, opts, in[0]);
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setTag(_tag);
    gloo::allgather(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
395 396
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
397 398 399
  std::shared_ptr<AllgatherGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
400 401
  task = std::make_shared<AllgatherGlooTask>(
      rank_, context, in_tensors, out_tensors, tag);
402 403 404 405 406 407
  task->Run();
  return task;
}

class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
408 409
  ReduceGlooTask(int rank,
                 const std::shared_ptr<gloo::Context>& context,
410 411
                 std::vector<phi::DenseTensor>& inputs,   // NOLINT
                 std::vector<phi::DenseTensor>& outputs,  // NOLINT
412 413 414
                 ReduceOp reduce_op,
                 int dst,
                 uint32_t tag)
415
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::REDUCE),
416
        _context(context),
417 418
        _inputs(inputs),
        _outputs(outputs),
419 420 421 422
        _reduce_op(reduce_op),
        _dst(dst),
        _tag(tag) {}

423
  void Run() override { _do_reduce(_inputs, _outputs, _dst); }
424 425 426

 private:
  std::shared_ptr<gloo::Context> _context;
427 428
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
  const ReduceOp _reduce_op;
  int _dst;
  uint32_t _tag;

  gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
                                          const ReduceOp op) {
    gloo::ReduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::ReduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

446 447 448 449
  void _do_reduce(std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
                  int dst) {
    const auto& dtype = inputs[0].dtype();
450
    gloo::ReduceOptions opts(_context);
451 452
    GENERATE_FUNC(dtype, set_input, opts, inputs[0]);
    GENERATE_FUNC(dtype, set_output, opts, outputs[0]);
453 454 455 456 457 458 459 460
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    opts.setRoot(dst);
    gloo::reduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
461
    std::vector<phi::DenseTensor>& inputs,
462 463
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts) {
464 465 466 467 468 469 470 471
  return Reduce(inputs, outputs, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
    std::vector<phi::DenseTensor>& inputs,
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts,
    bool sync_op) {
472 473 474
  std::shared_ptr<ReduceGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
475 476
  task = std::make_shared<ReduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, opts.root_rank, tag);
477 478 479 480 481 482
  task->Run();
  return task;
}

class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
 public:
483 484
  ScatterGlooTask(int rank,
                  const std::shared_ptr<gloo::Context>& context,
485 486
                  std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
487 488 489
                  int src,
                  int size,
                  uint32_t tag)
490 491 492 493 494 495 496 497 498 499 500 501
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _src(src),
        _size(size),
        _tag(tag) {}

  void Run() override { _do_scatter(_inputs, _outputs, _src); }

 private:
  std::shared_ptr<gloo::Context> _context;
502 503
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
504 505 506 507
  int _src;
  int _size;
  uint32_t _tag;

508 509
  void _do_scatter(std::vector<phi::DenseTensor>& in,   // NOLINT
                   std::vector<phi::DenseTensor>& out,  // NOLINT
510
                   int src) {
511
    const auto& dtype = in[0].dtype();
512 513
    gloo::ScatterOptions opts(_context);
    if (rank_ == src) {
514
      GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in[0], _size);
515 516 517 518 519 520 521 522 523
    }
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setRoot(src);
    opts.setTag(_tag);
    gloo::scatter(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
524
    std::vector<phi::DenseTensor>& in_tensors,
525 526
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts) {
527 528 529 530 531 532 533 534
  return Scatter(in_tensors, out_tensors, opts, true);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts,
    bool sync_op) {
535 536 537 538 539 540 541 542 543
  std::shared_ptr<ScatterGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
  task = std::make_shared<ScatterGlooTask>(
      rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
  task->Run();
  return task;
}

544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
  ::gloo::transport::tcp::attr attr;
  attr.iface = ifname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForHostname(const std::string& hostname) {
  ::gloo::transport::tcp::attr attr;
  attr.hostname = hostname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDefaultDevice() {
  std::array<char, HOST_NAME_MAX> hostname{};
  auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX);
562
  PADDLE_ENFORCE_EQ(
563 564
      ret,
      0,
565
      platform::errors::Fatal("Get hostname error for createDefaultDevice."));
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
  ::addrinfo* result;
  result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC);
  ::addrinfo* cur;
  for (cur = result; cur != nullptr; cur = cur->ai_next) {
    SocketType socket =
        ::socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol);
    if (socket == -1) {
      continue;
    }
    ret = ::bind(socket, cur->ai_addr, cur->ai_addrlen);
#ifdef _WIN32
    closesocket(socket);
#else
    close(socket);
#endif
    if (ret == -1) {
      continue;
    }
    break;
  }
  freeaddrinfo(result);
  if (cur != nullptr) {
    return createDeviceForHostname(hostname.data());
  }
  return createDeviceForHostname("127.0.0.1");
}

}  // namespace distributed
}  // namespace paddle