From 5435459a81fe68a170ce3ad08c588c3793bea773 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Fri, 4 Mar 2022 14:03:00 +0800 Subject: [PATCH] add communication api for ProcessGroupGloo (#40100) * add pg_gloo apis --- .../collective/ProcessGroupGloo.cc | 194 ++++++++++++++++++ .../distributed/collective/ProcessGroupGloo.h | 14 ++ paddle/fluid/distributed/store/tcp_store.cc | 68 +++--- .../tests/unittests/process_group_gloo.py | 83 +++++++- 4 files changed, 327 insertions(+), 32 deletions(-) diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index 03ad48f560a..5dc43af1178 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -25,6 +25,8 @@ #endif #include +#include +#include #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/platform/enforce.h" @@ -144,6 +146,22 @@ void set_inputs(P& opts, const std::vector& tensors) { // NOLINT opts.setInputs(get_multi_data(tensors), tensors[0].numel()); } +template +void set_inputs_for_scatter(P& opts, // NOLINT + const std::vector& tensors, // NOLINT + int nranks) { + std::vector ret(nranks); + auto raw_tensor = + std::dynamic_pointer_cast(tensors[0].impl()); + T* raw_pointer = reinterpret_cast(raw_tensor->data()); + size_t offset = 0; + for (int i = 0; i < nranks; i++) { + ret[i] = raw_pointer + offset; + offset += tensors[0].numel() / nranks; + } + opts.setInputs(ret, tensors[0].numel() / nranks); +} + ProcessGroupGloo::GlooTask::GlooTask(int rank, const std::vector& inputs, CommType comm_type) @@ -257,6 +275,182 @@ std::shared_ptr ProcessGroupGloo::AllReduce( return task; } +class BarrierGlooTask : public ProcessGroupGloo::GlooTask { + public: + BarrierGlooTask(int rank, const std::shared_ptr& context) + : ProcessGroupGloo::GlooTask(rank, std::vector{}, + CommType::BARRIER), + _context(context) {} + + void Run() override { _do_barrier(); } + + private: + std::shared_ptr _context; + + void _do_barrier() { + gloo::BarrierOptions opts(_context); + gloo::barrier(opts); + } +}; + +std::shared_ptr ProcessGroupGloo::Barrier( + const BarrierOptions& opts) { + std::shared_ptr task; + auto context = get_context(); + task = std::make_shared(rank_, context); + task->Run(); + return task; +} + +class AllgatherGlooTask : public ProcessGroupGloo::GlooTask { + public: + AllgatherGlooTask(int rank, const std::shared_ptr& context, + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT + uint32_t tag) + : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER), + _context(context), + _inputs(inputs), + _outputs(outputs), + _tag(tag) {} + + void Run() override { _do_allgather(_inputs, _outputs); } + + private: + std::shared_ptr _context; + std::vector _inputs; + std::vector _outputs; + uint32_t _tag; + + void _do_allgather(std::vector& in, // NOLINT + std::vector& out) { // NOLINT + const auto& dtype = in[0].type(); + gloo::AllgatherOptions opts(_context); + GENERATE_FUNC(dtype, set_input, opts, in[0]); + GENERATE_FUNC(dtype, set_output, opts, out[0]); + opts.setTag(_tag); + gloo::allgather(opts); + } +}; + +std::shared_ptr ProcessGroupGloo::AllGather( + std::vector& in_tensors, std::vector& out_tensors) { + std::shared_ptr task; + auto tag = next_tag(); + auto context = get_context(); + task = std::make_shared(rank_, context, in_tensors, + out_tensors, tag); + task->Run(); + return task; +} + +class ReduceGlooTask : public ProcessGroupGloo::GlooTask { + public: + ReduceGlooTask(int rank, const std::shared_ptr& context, + std::vector& in, ReduceOp reduce_op, // NOLINT + int dst, uint32_t tag) + : ProcessGroupGloo::GlooTask(rank, in, CommType::REDUCE), + _context(context), + _inputs(in), + _reduce_op(reduce_op), + _dst(dst), + _tag(tag) {} + + void Run() override { _do_reduce(_inputs, _dst); } + + private: + std::shared_ptr _context; + std::vector _inputs; + const ReduceOp _reduce_op; + int _dst; + uint32_t _tag; + + gloo::ReduceOptions::Func _get_function(const experimental::DataType type, + const ReduceOp op) { + gloo::ReduceOptions::Func fn; + GENERATE_FUNC(type, _get_function_impl, fn, op); + return fn; + } + + template + void _get_function_impl(gloo::ReduceOptions::Func& fn, // NOLINT + const ReduceOp op) { + fn = get_function(op); + } + + void _do_reduce(std::vector& tensors, int dst) { // NOLINT + const auto& dtype = tensors[0].type(); + gloo::ReduceOptions opts(_context); + GENERATE_FUNC(dtype, set_input, opts, tensors[0]); + GENERATE_FUNC(dtype, set_output, opts, tensors[0]); + opts.setReduceFunction(_get_function(dtype, _reduce_op)); + opts.setTag(_tag); + opts.setRoot(dst); + gloo::reduce(opts); + } +}; + +std::shared_ptr ProcessGroupGloo::Reduce( + std::vector& tensors, const ReduceOptions& opts) { + std::shared_ptr task; + auto tag = next_tag(); + auto context = get_context(); + task = std::make_shared(rank_, context, tensors, + opts.reduce_op, opts.root_rank, tag); + task->Run(); + return task; +} + +class ScatterGlooTask : public ProcessGroupGloo::GlooTask { + public: + ScatterGlooTask(int rank, const std::shared_ptr& context, + std::vector& inputs, // NOLINT + std::vector& outputs, // NOLINT + int src, int size, uint32_t tag) + : ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER), + _context(context), + _inputs(inputs), + _outputs(outputs), + _src(src), + _size(size), + _tag(tag) {} + + void Run() override { _do_scatter(_inputs, _outputs, _src); } + + private: + std::shared_ptr _context; + std::vector _inputs; + std::vector _outputs; + int _src; + int _size; + uint32_t _tag; + + void _do_scatter(std::vector& in, std::vector& out, // NOLINT + int src) { + const auto& dtype = in[0].type(); + gloo::ScatterOptions opts(_context); + if (rank_ == src) { + GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in, _size); + } + GENERATE_FUNC(dtype, set_output, opts, out[0]); + opts.setRoot(src); + opts.setTag(_tag); + gloo::scatter(opts); + } +}; + +std::shared_ptr ProcessGroupGloo::Scatter( + std::vector& in_tensors, std::vector& out_tensors, + const ScatterOptions& opts) { + std::shared_ptr task; + auto tag = next_tag(); + auto context = get_context(); + task = std::make_shared( + rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag); + task->Run(); + return task; +} + std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) { ::gloo::transport::tcp::attr attr; diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index d989939fcb8..24f156571a4 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -114,6 +114,20 @@ class ProcessGroupGloo : public ProcessGroup { std::vector& inputs, const AllreduceOptions& opts = AllreduceOptions()) override; + std::shared_ptr Barrier( + const BarrierOptions& = BarrierOptions()) override; + + std::shared_ptr AllGather( + std::vector& in_tensors, + std::vector& out_tensors) override; + + std::shared_ptr Reduce( + std::vector& tensors, const ReduceOptions& opts) override; + + std::shared_ptr Scatter(std::vector& in_tensors, + std::vector& out_tensors, + const ScatterOptions&) override; + std::shared_ptr<::gloo::Context> get_context() { return _context; } uint64_t next_tag() { return _tag++; } diff --git a/paddle/fluid/distributed/store/tcp_store.cc b/paddle/fluid/distributed/store/tcp_store.cc index 8675981955d..eb98c89c99e 100644 --- a/paddle/fluid/distributed/store/tcp_store.cc +++ b/paddle/fluid/distributed/store/tcp_store.cc @@ -74,6 +74,7 @@ void MasterDaemon::_do_set(SocketType socket) { } void MasterDaemon::_do_get(SocketType socket) { + VLOG(3) << "MasterDaemon::_do_get"; std::string key = tcputils::receive_string(socket); auto iter = _store.find(key); PADDLE_ENFORCE_NE( @@ -86,13 +87,14 @@ void MasterDaemon::_do_get(SocketType socket) { void MasterDaemon::_do_stop(SocketType socket) { VLOG(3) << "MasterDaemon::_do_stop"; ReplyType value = ReplyType::STOP_WAIT; + tcputils::send_value(socket, value); if (--_nranks == 0) { _stop = true; } - tcputils::send_value(socket, value); } void MasterDaemon::_do_wait(SocketType socket) { + VLOG(3) << "MasterDaemon::_do_wait"; std::string key = tcputils::receive_string(socket); auto iter = _store.find(key); auto reply = ReplyType::STOP_WAIT; @@ -134,32 +136,42 @@ void MasterDaemon::run() { } for (size_t i = 1; i < fds.size(); i++) { - if (fds[i].revents == 0) { - continue; - } - - Command command = tcputils::receive_value(fds[i].fd); - VLOG(3) << "TCPStore: recv command: " << static_cast(command) << "."; - - switch (command) { - case Command::ADD: - _do_add(fds[i].fd); - break; - case Command::GET: - _do_get(fds[i].fd); - break; - case Command::SET: - _do_set(fds[i].fd); - break; - case Command::WAIT: - _do_wait(fds[i].fd); - break; - case Command::STOP: - _do_stop(fds[i].fd); - break; - default: - VLOG(0) << "Unknow command: " << static_cast(command); - exit(-1); + VLOG(0) << "fds.size:" << fds.size(); + VLOG(0) << "fds.size-i:" << i; + VLOG(0) << "fds[i].revents:" << fds[i].revents; + + try { + if (fds[i].revents == 0) { + continue; + } + + Command command = tcputils::receive_value(fds[i].fd); + VLOG(3) << "TCPStore: recv command: " << static_cast(command) + << "."; + + switch (command) { + case Command::ADD: + _do_add(fds[i].fd); + break; + case Command::GET: + _do_get(fds[i].fd); + break; + case Command::SET: + _do_set(fds[i].fd); + break; + case Command::WAIT: + _do_wait(fds[i].fd); + break; + case Command::STOP: + _do_stop(fds[i].fd); + break; + default: + VLOG(0) << "Unknow command: " << static_cast(command); + exit(-1); + } + } catch (...) { + fds.erase(fds.begin() + i); + _sockets.erase(_sockets.begin() + i - 1); } } } @@ -281,8 +293,8 @@ void TCPStore::wait(const std::string& key) { } TCPStore::~TCPStore() { - _client->send_command_for_key(Command::STOP, ""); VLOG(3) << "~TCPStore"; + _client->send_command_for_key(Command::STOP, ""); ReplyType ret = _client->receive_value(); PADDLE_ENFORCE_EQ(ret, ReplyType::STOP_WAIT, platform::errors::InvalidArgument( diff --git a/python/paddle/fluid/tests/unittests/process_group_gloo.py b/python/paddle/fluid/tests/unittests/process_group_gloo.py index 5420e1d36b3..c62c4615f74 100644 --- a/python/paddle/fluid/tests/unittests/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/process_group_gloo.py @@ -104,16 +104,91 @@ class TestProcessGroupFp32(unittest.TestCase): broadcast_result = paddle.assign(tensor_x) if rank == 0: task = pg.broadcast(tensor_x, 0) - task.synchronize() - assert task.is_completed() assert np.array_equal(broadcast_result, tensor_x) else: task = pg.broadcast(tensor_y, 0) - task.synchronize() - assert task.is_completed() assert np.array_equal(broadcast_result, tensor_y) print("test broadcast api ok") + # test barrier + # rank 0 + if pg.rank() == 0: + task = pg.barrier() + task.wait() + # rank 1 + else: + task = pg.barrier() + task.wait() + + print("test barrier api ok\n") + + # test allgather + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + out_shape = list(self.shape) + out_shape[0] *= 2 + out = np.random.random(out_shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + task = pg.all_gather(tensor_y, tensor_out) + task.wait() + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2], + [out_shape[0]]) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api ok\n") + + # test Reduce + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = pg.reduce(tensor_x, 0) + task.wait() + # rank 1 + else: + task = pg.reduce(tensor_y, 0) + task.wait() + if pg.rank() == 0: + assert np.array_equal(tensor_x, sum_result) + print("test reduce sum api ok\n") + + # test Scatter + # rank 0 + in_shape = list(self.shape) + in_shape[0] *= 2 + x = np.random.random(in_shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + # rank 1 + else: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) + out2 = paddle.slice(tensor_x, [0], [self.shape[0]], + [self.shape[0] * 2]) + if pg.rank() == 0: + assert np.array_equal(tensor_y, out1) + else: + assert np.array_equal(tensor_y, out2) + print("test scatter api ok\n") + if __name__ == "__main__": unittest.main() -- GitLab