From b8850521023ec3248bc9a882a3f98ffb6c664c0c Mon Sep 17 00:00:00 2001 From: yuehuayingxueluo <867460659@qq.com> Date: Thu, 30 Mar 2023 07:19:15 +0800 Subject: [PATCH] Add Gloo SendRecv Function (#52221) * add gloo send_recv * fix code_stype * fix CI bug * fix send_recv.cc * add send_recv without sync_op * fix send_recv test * fix gather.cc --- .../distributed/collective/CMakeLists.txt | 2 +- .../collective/process_group_gloo.cc | 102 ++++++++++++++++++ .../collective/process_group_gloo.h | 14 +++ .../fluid/distributed/collective/send_recv.cc | 42 ++++++++ .../fluid/distributed/collective/send_recv.h | 68 ++++++++++++ .../collective/process_group_gloo.py | 25 +++++ 6 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 paddle/fluid/distributed/collective/send_recv.cc create mode 100644 paddle/fluid/distributed/collective/send_recv.h diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index 2fa52500617..c22f88e0dbf 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -11,7 +11,7 @@ cc_library( if(WITH_DISTRIBUTE) cc_library( process_group_gloo - SRCS process_group_gloo.cc + SRCS process_group_gloo.cc send_recv.cc DEPS phi_api eager_api gloo_wrapper tcp_store) endif() diff --git a/paddle/fluid/distributed/collective/process_group_gloo.cc b/paddle/fluid/distributed/collective/process_group_gloo.cc index 65372c1cde1..d778652a9ca 100644 --- a/paddle/fluid/distributed/collective/process_group_gloo.cc +++ b/paddle/fluid/distributed/collective/process_group_gloo.cc @@ -30,6 +30,7 @@ #include "paddle/fluid/distributed/collective/common.h" #include "paddle/fluid/distributed/collective/process_group_gloo.h" +#include "paddle/fluid/distributed/collective/send_recv.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/platform/enforce.h" @@ -261,6 +262,107 @@ std::shared_ptr ProcessGroupGloo::Broadcast( return task; } +class SendGlooTask : public ProcessGroupGloo::GlooTask { + public: + SendGlooTask(const std::shared_ptr& context, + std::vector* inputs, + int rank, + int dst_rank, + uint32_t tag) + : ProcessGroupGloo::GlooTask(rank, *inputs, CommType::SEND), + _context(context), + _inputs(*inputs), + _dst(dst_rank), + _tag(tag) {} + + void Run() override { _do_send(_inputs); } + + private: + std::shared_ptr _context; + std::vector _inputs; + int _dst; + uint32_t _tag; + + void _do_send(std::vector& in) { // NOLINT + SendRecvOptions opts(_context); + const auto& dtype = in[0].dtype(); + GENERATE_FUNC(dtype, set_input, opts, in[0]); + + opts.setSrc(_context.get()->rank); + opts.setDst(_dst); + opts.setTag(_tag); + send_recv(&opts); + } +}; + +std::shared_ptr ProcessGroupGloo::Send( + const phi::DenseTensor& tensor, int dst_rank, bool sync_op) { + std::vector in_wrapper{tensor}; + return Send(in_wrapper, dst_rank); +} + +std::shared_ptr ProcessGroupGloo::Send( + std::vector& inputs, int dst_rank) { + std::unique_ptr task; + auto tag = next_tag(); + auto context = get_context(); + task = std::make_unique(context, &inputs, rank_, dst_rank, tag); + task->Run(); + + return task; +} + +class RecvGlooTask : public ProcessGroupGloo::GlooTask { + public: + RecvGlooTask(const std::shared_ptr& context, + std::vector* outputs, + int rank, + int src_rank, + uint32_t tag) + : ProcessGroupGloo::GlooTask(rank, *outputs, CommType::RECV), + _context(context), + _outputs(*outputs), + _src(src_rank), + _tag(tag) {} + + void Run() override { _do_recv(_outputs); } + + private: + std::shared_ptr _context; + std::vector _outputs; + const int _src; + const uint32_t _tag; + + void _do_recv(std::vector& out) { // NOLINT + SendRecvOptions opts(_context); + const auto& dtype = out[0].dtype(); + GENERATE_FUNC(dtype, set_output, opts, out[0]); + + opts.setSrc(_src); + opts.setDst(_context.get()->rank); + opts.setTag(_tag); + send_recv(&opts); + } +}; + +std::shared_ptr ProcessGroupGloo::Recv( + phi::DenseTensor* tensor, int src_rank, bool sync_op) { + std::vector in_wrapper{*tensor}; + return Recv(in_wrapper, src_rank); +} + +std::shared_ptr ProcessGroupGloo::Recv( + std::vector& outputs, int src_rank) { + std::unique_ptr task; + auto tag = next_tag(); + auto context = get_context(); + + task = + std::make_unique(context, &outputs, rank_, src_rank, tag); + task->Run(); + return task; +} + class AllreduceGlooTask : public ProcessGroupGloo::GlooTask { public: AllreduceGlooTask(int rank, diff --git a/paddle/fluid/distributed/collective/process_group_gloo.h b/paddle/fluid/distributed/collective/process_group_gloo.h index 5b41949f521..ba3bad76b27 100644 --- a/paddle/fluid/distributed/collective/process_group_gloo.h +++ b/paddle/fluid/distributed/collective/process_group_gloo.h @@ -132,6 +132,14 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { const BroadcastOptions& opts, bool sync_op) override; + std::shared_ptr Send(const phi::DenseTensor& tensor, + int dst_rank, + bool sync_op) override; + + std::shared_ptr Recv(phi::DenseTensor* tensor, + int src_rank, + bool sync_op) override; + std::shared_ptr Reduce(phi::DenseTensor* out_tensor, const phi::DenseTensor& in_tensor, const ReduceOptions& opts, @@ -154,6 +162,12 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { const BroadcastOptions& opts, bool sync_op) override; + std::shared_ptr Send( + std::vector& inputs, int dst_rank) override; + + std::shared_ptr Recv( + std::vector& outputs, int src_rank) override; + std::shared_ptr AllReduce( std::vector& inputs, std::vector& outputs, diff --git a/paddle/fluid/distributed/collective/send_recv.cc b/paddle/fluid/distributed/collective/send_recv.cc new file mode 100644 index 00000000000..2079e5972a1 --- /dev/null +++ b/paddle/fluid/distributed/collective/send_recv.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "gloo/common/logging.h" +#include "gloo/math.h" +#include "gloo/types.h" +#include "paddle/fluid/distributed/collective/send_recv.h" + +namespace paddle { +namespace distributed { + +void send_recv(SendRecvOptions* opts) { + const auto& context = opts->context; + gloo::transport::UnboundBuffer* in = opts->in.get(); + gloo::transport::UnboundBuffer* out = opts->out.get(); + const auto slot = gloo::Slot::build(kSendRecvSlotPrefix, opts->tag); + + if (context->rank == opts->src) { + in->send(opts->dst, slot); + in->waitSend(opts->timeout); + } else if (context->rank == opts->dst) { + out->recv(opts->src, slot); + out->waitRecv(opts->timeout); + } +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/send_recv.h b/paddle/fluid/distributed/collective/send_recv.h new file mode 100644 index 00000000000..fa872742b27 --- /dev/null +++ b/paddle/fluid/distributed/collective/send_recv.h @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "gloo/context.h" +#include "gloo/transport/unbound_buffer.h" + +namespace paddle { +namespace distributed { + +constexpr uint8_t kSendRecvSlotPrefix = 0x08; + +class SendRecvOptions { + public: + explicit SendRecvOptions(const std::shared_ptr& context) + : context(context), timeout(context->getTimeout()) {} + + template + void setInput(T* ptr, size_t elements) { + this->in = context->createUnboundBuffer(ptr, elements * sizeof(T)); + } + + template + void setOutput(T* ptr, size_t elements) { + this->out = context->createUnboundBuffer(ptr, elements * sizeof(T)); + } + + void setSrc(int src) { this->src = src; } + + void setDst(int dst) { this->dst = dst; } + + void setTag(uint32_t tag) { this->tag = tag; } + + void setTimeout(std::chrono::milliseconds timeout) { + this->timeout = timeout; + } + + protected: + std::shared_ptr context; + std::unique_ptr in; + std::unique_ptr out; + + // Rank of process to send_recv from. + int src = -1; + + // Rank of process to send_recv to. + int dst = -1; + + // Tag for this operation. + // Must be unique across operations executing in parallel. + uint32_t tag = 0; + + // End-to-end timeout for this operation. + std::chrono::milliseconds timeout; + + friend void send_recv(SendRecvOptions*); +}; + +void send_recv(SendRecvOptions* opts); + +} // namespace distributed +} // namespace paddle diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py index c657088f337..17aa27f1bc0 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py @@ -14,6 +14,7 @@ import random import unittest +from copy import deepcopy import numpy as np @@ -100,6 +101,30 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(broadcast_result, tensor_y) print("test broadcast api ok") + # test send_recv + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y_1 = paddle.to_tensor(y) + tensor_y_2 = deepcopy(tensor_y_1) + + send_recv_result_1 = paddle.assign(tensor_x) + send_recv_result_2 = paddle.assign(tensor_y_2) + if pg.rank() == 0: + task = pg.send(tensor_x, 1, True) + else: + task = pg.recv(tensor_y_1, 0, True) + assert np.array_equal(send_recv_result_1, tensor_y_1) + + if pg.rank() == 0: + task = pg.recv(tensor_x, 1, True) + assert np.array_equal(send_recv_result_2, tensor_x) + else: + task = pg.send(tensor_y_2, 0, True) + print("test send_recv api ok") + # test barrier # rank 0 if pg.rank() == 0: -- GitLab