ProcessGroupGloo.cc 17.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>

#ifdef _WIN32
#include <gloo/common/win.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#endif

#include <gloo/broadcast.h>
28 29
#include <gloo/reduce.h>
#include <gloo/scatter.h>
30

31
#include "paddle/fluid/distributed/collective/Common.h"
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace distributed {

#ifdef _WIN32
#define GENERATE_FUNC(type, func, ...)       \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(__VA_ARGS__);              \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(__VA_ARGS__);             \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(__VA_ARGS__);      \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(__VA_ARGS__);            \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(__VA_ARGS__);            \
      break;                                 \
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }

#define HOST_NAME_MAX 256

#else
#define GENERATE_FUNC(type, func, args...)   \
  switch (type) {                            \
    case experimental::DataType::FLOAT32:    \
      func<float>(args);                     \
      break;                                 \
    case experimental::DataType::FLOAT64:    \
      func<double>(args);                    \
      break;                                 \
    case experimental::DataType::FLOAT16:    \
      func<gloo::float16>(args);             \
      break;                                 \
    case experimental::DataType::INT32:      \
      func<int32_t>(args);                   \
      break;                                 \
    case experimental::DataType::INT64:      \
      func<int64_t>(args);                   \
      break;                                 \
    default:                                 \
      VLOG(0) << "Error: Unknown DataType."; \
      exit(-1);                              \
  }
#endif

typedef void (*reduce_func)(void*, const void*, const void*, size_t);

template <typename T>
reduce_func get_function(const ReduceOp& r) {
  switch (r) {
    case ReduceOp::SUM:
      return reduce_func(&::gloo::sum<T>);
    case ReduceOp::PRODUCT:
      return reduce_func(&::gloo::product<T>);
    case ReduceOp::MIN:
      return reduce_func(&::gloo::min<T>);
    case ReduceOp::MAX:
      return reduce_func(&::gloo::max<T>);
    case ReduceOp::AVG:
      VLOG(0) << "Error: Unsupported ReduceOp::AVG.";
      exit(-1);
  }

  VLOG(0) << "Error: Unknown ReduceOp.";
  exit(-1);
}

template <typename T>
111 112
T* get_data(phi::DenseTensor& tensor) {  // NOLINT
  return reinterpret_cast<T*>(tensor.data());
113 114 115
}

template <typename T>
116 117 118 119
std::vector<T*> get_multi_data(
    std::vector<phi::DenseTensor>& tensors) {  // NOLINT
  std::vector<T*> ret;
  ret.reserve(tensors.size());
120
  for (size_t i = 0; i < tensors.size(); i++) {
121
    ret.push_back(get_data<T>(tensors[i]));
122 123 124 125 126
  }
  return ret;
}

template <typename T, typename P>
127
void set_output(P& opts, phi::DenseTensor& tensor) {  // NOLINT
128 129 130 131
  opts.setOutput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
132
void set_input(P& opts, phi::DenseTensor& tensor) {  // NOLINT
133 134 135 136
  opts.setInput(get_data<T>(tensor), tensor.numel());
}

template <typename T, typename P>
137 138
void set_outputs(P& opts,                                   // NOLINT
                 std::vector<phi::DenseTensor>& tensors) {  // NOLINT
139 140 141 142
  opts.setOutputs(get_multi_data<T>(tensors), tensors[0].numel());
}

template <typename T, typename P>
143 144
void set_inputs(P& opts,                                   // NOLINT
                std::vector<phi::DenseTensor>& tensors) {  // NOLINT
145 146 147
  opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}

148
template <typename T, typename P>
149 150
void set_inputs_for_scatter(P& opts,                   // NOLINT
                            phi::DenseTensor& tensor,  // NOLINT
151
                            int nranks) {
152 153 154
  std::vector<T*> ret;
  ret.reserve(nranks);
  T* raw_pointer = reinterpret_cast<T*>(tensor.data());
155 156
  size_t offset = 0;
  for (int i = 0; i < nranks; i++) {
157 158
    ret.push_back(raw_pointer + offset);
    offset += tensor.numel() / nranks;
159
  }
160
  opts.setInputs(ret, tensor.numel() / nranks);
161 162
}

163 164 165
ProcessGroupGloo::GlooTask::GlooTask(
    int rank, const std::vector<phi::DenseTensor>& inputs, CommType comm_type)
    : ProcessGroup::Task(rank, inputs, comm_type) {}
166

167
ProcessGroupGloo::ProcessGroupGloo(
168 169 170 171 172
    const std::shared_ptr<distributed::Store>& store,
    int rank,
    int world_size,
    const platform::Place& place,
    int gid,
173 174
    const std::shared_ptr<GlooOptions> options)
    : ProcessGroup(rank, world_size, place, gid),
L
lilong12 已提交
175 176
      _tag(0),
      _store(new GlooStore(store)) {
177 178
  _context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
  auto prefix_store =
179
      ::gloo::rendezvous::PrefixStore(std::to_string(gid), *_store);
180 181 182 183 184 185
  _context->connectFullMesh(prefix_store, options->device);
}

class BroadcastGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BroadcastGlooTask(const std::shared_ptr<gloo::Context>& context,
186 187
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
188 189 190
                    int rank,
                    int root,
                    uint32_t tag)
191 192 193 194
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::BROADCAST),
        _context(context),
        _root(root),
        _inputs(inputs),
195
        _outputs(outputs),
196 197
        _tag(tag) {}

198
  void Run() override { _do_broadcast(_inputs[0], _outputs[0]); }
199 200 201 202

 private:
  std::shared_ptr<gloo::Context> _context;
  const int _root;
203 204
  std::vector<phi::DenseTensor> _inputs{};
  std::vector<phi::DenseTensor> _outputs{};
205 206
  const uint32_t _tag;

207
  void _do_broadcast(phi::DenseTensor& in, phi::DenseTensor& out) {  // NOLINT
208
    gloo::BroadcastOptions opts(_context);
209 210 211 212 213
    const auto& dtype = in.dtype();
    if (rank_ == _root) {
      GENERATE_FUNC(dtype, set_input, opts, in);
    }
    GENERATE_FUNC(dtype, set_output, opts, out);
214 215 216 217 218 219 220
    opts.setRoot(_root);
    opts.setTag(_tag);
    gloo::broadcast(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
221
    std::vector<phi::DenseTensor>& inputs,
222 223
    std::vector<phi::DenseTensor>& outputs,
    const BroadcastOptions& opts) {
224 225 226 227
  auto root = opts.source_rank;
  std::unique_ptr<BroadcastGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
228 229
  task = std::make_unique<BroadcastGlooTask>(
      context, inputs, outputs, rank_, root, tag);
230 231 232 233 234 235
  task->Run();
  return task;
}

class AllreduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
236 237
  AllreduceGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
238 239
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
240 241
                    ReduceOp reduce_op,
                    uint32_t tag)
242 243 244
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE),
        _context(context),
        _inputs(inputs),
245
        _outputs(outputs),
246 247 248
        _reduce_op(reduce_op),
        _tag(tag) {}

249
  void Run() override { _do_allreduce(_inputs, _outputs); }
250 251 252

 private:
  std::shared_ptr<gloo::Context> _context;
253 254
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  const ReduceOp _reduce_op;
  uint32_t _tag;

  gloo::AllreduceOptions::Func _get_function(const experimental::DataType type,
                                             const ReduceOp op) {
    gloo::AllreduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::AllreduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

271 272 273
  void _do_allreduce(std::vector<phi::DenseTensor>& ins,     // NOLINT
                     std::vector<phi::DenseTensor>& outs) {  // NOLINT
    const auto& dtype = ins[0].dtype();
274
    gloo::AllreduceOptions opts(_context);
275 276
    GENERATE_FUNC(dtype, set_inputs, opts, ins);
    GENERATE_FUNC(dtype, set_outputs, opts, outs);
277 278 279 280 281 282 283
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    gloo::allreduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
284
    std::vector<phi::DenseTensor>& inputs,
285 286
    std::vector<phi::DenseTensor>& outputs,
    const AllreduceOptions& opts) {
287 288 289
  auto tag = next_tag();
  std::shared_ptr<GlooTask> task;
  auto context = get_context();
290 291
  task = std::make_shared<AllreduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, tag);
292 293 294 295
  task->Run();
  return task;
}

296 297 298
class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
 public:
  BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
299 300
      : ProcessGroupGloo::GlooTask(
            rank, std::vector<phi::DenseTensor>{}, CommType::BARRIER),
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
        _context(context) {}

  void Run() override { _do_barrier(); }

 private:
  std::shared_ptr<gloo::Context> _context;

  void _do_barrier() {
    gloo::BarrierOptions opts(_context);
    gloo::barrier(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
    const BarrierOptions& opts) {
  std::shared_ptr<BarrierGlooTask> task;
  auto context = get_context();
  task = std::make_shared<BarrierGlooTask>(rank_, context);
  task->Run();
  return task;
}

class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
 public:
325 326
  AllgatherGlooTask(int rank,
                    const std::shared_ptr<gloo::Context>& context,
327 328
                    std::vector<phi::DenseTensor>& inputs,   // NOLINT
                    std::vector<phi::DenseTensor>& outputs,  // NOLINT
329 330 331 332 333 334 335 336 337 338 339
                    uint32_t tag)
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _tag(tag) {}

  void Run() override { _do_allgather(_inputs, _outputs); }

 private:
  std::shared_ptr<gloo::Context> _context;
340 341
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
342 343
  uint32_t _tag;

344 345 346
  void _do_allgather(std::vector<phi::DenseTensor>& in,     // NOLINT
                     std::vector<phi::DenseTensor>& out) {  // NOLINT
    const auto& dtype = in[0].dtype();
347 348 349 350 351 352 353 354 355
    gloo::AllgatherOptions opts(_context);
    GENERATE_FUNC(dtype, set_input, opts, in[0]);
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setTag(_tag);
    gloo::allgather(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
356 357
    std::vector<phi::DenseTensor>& in_tensors,
    std::vector<phi::DenseTensor>& out_tensors) {
358 359 360
  std::shared_ptr<AllgatherGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
361 362
  task = std::make_shared<AllgatherGlooTask>(
      rank_, context, in_tensors, out_tensors, tag);
363 364 365 366 367 368
  task->Run();
  return task;
}

class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
 public:
369 370
  ReduceGlooTask(int rank,
                 const std::shared_ptr<gloo::Context>& context,
371 372
                 std::vector<phi::DenseTensor>& inputs,   // NOLINT
                 std::vector<phi::DenseTensor>& outputs,  // NOLINT
373 374 375
                 ReduceOp reduce_op,
                 int dst,
                 uint32_t tag)
376
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::REDUCE),
377
        _context(context),
378 379
        _inputs(inputs),
        _outputs(outputs),
380 381 382 383
        _reduce_op(reduce_op),
        _dst(dst),
        _tag(tag) {}

384
  void Run() override { _do_reduce(_inputs, _outputs, _dst); }
385 386 387

 private:
  std::shared_ptr<gloo::Context> _context;
388 389
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
  const ReduceOp _reduce_op;
  int _dst;
  uint32_t _tag;

  gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
                                          const ReduceOp op) {
    gloo::ReduceOptions::Func fn;
    GENERATE_FUNC(type, _get_function_impl, fn, op);
    return fn;
  }

  template <typename T>
  void _get_function_impl(gloo::ReduceOptions::Func& fn,  // NOLINT
                          const ReduceOp op) {
    fn = get_function<T>(op);
  }

407 408 409 410
  void _do_reduce(std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
                  int dst) {
    const auto& dtype = inputs[0].dtype();
411
    gloo::ReduceOptions opts(_context);
412 413
    GENERATE_FUNC(dtype, set_input, opts, inputs[0]);
    GENERATE_FUNC(dtype, set_output, opts, outputs[0]);
414 415 416 417 418 419 420 421
    opts.setReduceFunction(_get_function(dtype, _reduce_op));
    opts.setTag(_tag);
    opts.setRoot(dst);
    gloo::reduce(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
422
    std::vector<phi::DenseTensor>& inputs,
423 424
    std::vector<phi::DenseTensor>& outputs,
    const ReduceOptions& opts) {
425 426 427
  std::shared_ptr<ReduceGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
428 429
  task = std::make_shared<ReduceGlooTask>(
      rank_, context, inputs, outputs, opts.reduce_op, opts.root_rank, tag);
430 431 432 433 434 435
  task->Run();
  return task;
}

class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
 public:
436 437
  ScatterGlooTask(int rank,
                  const std::shared_ptr<gloo::Context>& context,
438 439
                  std::vector<phi::DenseTensor>& inputs,   // NOLINT
                  std::vector<phi::DenseTensor>& outputs,  // NOLINT
440 441 442
                  int src,
                  int size,
                  uint32_t tag)
443 444 445 446 447 448 449 450 451 452 453 454
      : ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
        _context(context),
        _inputs(inputs),
        _outputs(outputs),
        _src(src),
        _size(size),
        _tag(tag) {}

  void Run() override { _do_scatter(_inputs, _outputs, _src); }

 private:
  std::shared_ptr<gloo::Context> _context;
455 456
  std::vector<phi::DenseTensor> _inputs;
  std::vector<phi::DenseTensor> _outputs;
457 458 459 460
  int _src;
  int _size;
  uint32_t _tag;

461 462
  void _do_scatter(std::vector<phi::DenseTensor>& in,   // NOLINT
                   std::vector<phi::DenseTensor>& out,  // NOLINT
463
                   int src) {
464
    const auto& dtype = in[0].dtype();
465 466
    gloo::ScatterOptions opts(_context);
    if (rank_ == src) {
467
      GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in[0], _size);
468 469 470 471 472 473 474 475 476
    }
    GENERATE_FUNC(dtype, set_output, opts, out[0]);
    opts.setRoot(src);
    opts.setTag(_tag);
    gloo::scatter(opts);
  }
};

std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
477
    std::vector<phi::DenseTensor>& in_tensors,
478 479
    std::vector<phi::DenseTensor>& out_tensors,
    const ScatterOptions& opts) {
480 481 482 483 484 485 486 487 488
  std::shared_ptr<ScatterGlooTask> task;
  auto tag = next_tag();
  auto context = get_context();
  task = std::make_shared<ScatterGlooTask>(
      rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
  task->Run();
  return task;
}

489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
  ::gloo::transport::tcp::attr attr;
  attr.iface = ifname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForHostname(const std::string& hostname) {
  ::gloo::transport::tcp::attr attr;
  attr.hostname = hostname;
  return ::gloo::transport::tcp::CreateDevice(attr);
}

std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDefaultDevice() {
  std::array<char, HOST_NAME_MAX> hostname{};
  auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX);
507
  PADDLE_ENFORCE_EQ(
508 509
      ret,
      0,
510
      platform::errors::Fatal("Get hostname error for createDefaultDevice."));
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
  ::addrinfo* result;
  result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC);
  ::addrinfo* cur;
  for (cur = result; cur != nullptr; cur = cur->ai_next) {
    SocketType socket =
        ::socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol);
    if (socket == -1) {
      continue;
    }
    ret = ::bind(socket, cur->ai_addr, cur->ai_addrlen);
#ifdef _WIN32
    closesocket(socket);
#else
    close(socket);
#endif
    if (ret == -1) {
      continue;
    }
    break;
  }
  freeaddrinfo(result);
  if (cur != nullptr) {
    return createDeviceForHostname(hostname.data());
  }
  return createDeviceForHostname("127.0.0.1");
}

}  // namespace distributed
}  // namespace paddle