未验证 提交 b8850521 编写于 作者: Y yuehuayingxueluo 提交者: GitHub

Add Gloo SendRecv Function (#52221)

* add gloo  send_recv

* fix code_stype

* fix CI bug

* fix send_recv.cc

* add send_recv without sync_op

* fix send_recv test

* fix gather.cc
上级 a523f6b3
...@@ -11,7 +11,7 @@ cc_library( ...@@ -11,7 +11,7 @@ cc_library(
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
cc_library( cc_library(
process_group_gloo process_group_gloo
SRCS process_group_gloo.cc SRCS process_group_gloo.cc send_recv.cc
DEPS phi_api eager_api gloo_wrapper tcp_store) DEPS phi_api eager_api gloo_wrapper tcp_store)
endif() endif()
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "paddle/fluid/distributed/collective/common.h" #include "paddle/fluid/distributed/collective/common.h"
#include "paddle/fluid/distributed/collective/process_group_gloo.h" #include "paddle/fluid/distributed/collective/process_group_gloo.h"
#include "paddle/fluid/distributed/collective/send_recv.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -261,6 +262,107 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast( ...@@ -261,6 +262,107 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
return task; return task;
} }
class SendGlooTask : public ProcessGroupGloo::GlooTask {
public:
SendGlooTask(const std::shared_ptr<gloo::Context>& context,
std::vector<phi::DenseTensor>* inputs,
int rank,
int dst_rank,
uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, *inputs, CommType::SEND),
_context(context),
_inputs(*inputs),
_dst(dst_rank),
_tag(tag) {}
void Run() override { _do_send(_inputs); }
private:
std::shared_ptr<gloo::Context> _context;
std::vector<phi::DenseTensor> _inputs;
int _dst;
uint32_t _tag;
void _do_send(std::vector<phi::DenseTensor>& in) { // NOLINT
SendRecvOptions opts(_context);
const auto& dtype = in[0].dtype();
GENERATE_FUNC(dtype, set_input, opts, in[0]);
opts.setSrc(_context.get()->rank);
opts.setDst(_dst);
opts.setTag(_tag);
send_recv(&opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Send(
const phi::DenseTensor& tensor, int dst_rank, bool sync_op) {
std::vector<phi::DenseTensor> in_wrapper{tensor};
return Send(in_wrapper, dst_rank);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Send(
std::vector<phi::DenseTensor>& inputs, int dst_rank) {
std::unique_ptr<SendGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_unique<SendGlooTask>(context, &inputs, rank_, dst_rank, tag);
task->Run();
return task;
}
class RecvGlooTask : public ProcessGroupGloo::GlooTask {
public:
RecvGlooTask(const std::shared_ptr<gloo::Context>& context,
std::vector<phi::DenseTensor>* outputs,
int rank,
int src_rank,
uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, *outputs, CommType::RECV),
_context(context),
_outputs(*outputs),
_src(src_rank),
_tag(tag) {}
void Run() override { _do_recv(_outputs); }
private:
std::shared_ptr<gloo::Context> _context;
std::vector<phi::DenseTensor> _outputs;
const int _src;
const uint32_t _tag;
void _do_recv(std::vector<phi::DenseTensor>& out) { // NOLINT
SendRecvOptions opts(_context);
const auto& dtype = out[0].dtype();
GENERATE_FUNC(dtype, set_output, opts, out[0]);
opts.setSrc(_src);
opts.setDst(_context.get()->rank);
opts.setTag(_tag);
send_recv(&opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Recv(
phi::DenseTensor* tensor, int src_rank, bool sync_op) {
std::vector<phi::DenseTensor> in_wrapper{*tensor};
return Recv(in_wrapper, src_rank);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Recv(
std::vector<phi::DenseTensor>& outputs, int src_rank) {
std::unique_ptr<RecvGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task =
std::make_unique<RecvGlooTask>(context, &outputs, rank_, src_rank, tag);
task->Run();
return task;
}
class AllreduceGlooTask : public ProcessGroupGloo::GlooTask { class AllreduceGlooTask : public ProcessGroupGloo::GlooTask {
public: public:
AllreduceGlooTask(int rank, AllreduceGlooTask(int rank,
......
...@@ -132,6 +132,14 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { ...@@ -132,6 +132,14 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream {
const BroadcastOptions& opts, const BroadcastOptions& opts,
bool sync_op) override; bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Send(const phi::DenseTensor& tensor,
int dst_rank,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Reduce(phi::DenseTensor* out_tensor, std::shared_ptr<ProcessGroup::Task> Reduce(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor, const phi::DenseTensor& in_tensor,
const ReduceOptions& opts, const ReduceOptions& opts,
...@@ -154,6 +162,12 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream { ...@@ -154,6 +162,12 @@ class ProcessGroupGloo : public ProcessGroupWithoutStream {
const BroadcastOptions& opts, const BroadcastOptions& opts,
bool sync_op) override; bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& inputs, int dst_rank) override;
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& outputs, int src_rank) override;
std::shared_ptr<ProcessGroup::Task> AllReduce( std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& inputs, std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs, std::vector<phi::DenseTensor>& outputs,
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <cstring>
#include "gloo/common/logging.h"
#include "gloo/math.h"
#include "gloo/types.h"
#include "paddle/fluid/distributed/collective/send_recv.h"
namespace paddle {
namespace distributed {
void send_recv(SendRecvOptions* opts) {
const auto& context = opts->context;
gloo::transport::UnboundBuffer* in = opts->in.get();
gloo::transport::UnboundBuffer* out = opts->out.get();
const auto slot = gloo::Slot::build(kSendRecvSlotPrefix, opts->tag);
if (context->rank == opts->src) {
in->send(opts->dst, slot);
in->waitSend(opts->timeout);
} else if (context->rank == opts->dst) {
out->recv(opts->src, slot);
out->waitRecv(opts->timeout);
}
}
} // namespace distributed
} // namespace paddle
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include "gloo/context.h"
#include "gloo/transport/unbound_buffer.h"
namespace paddle {
namespace distributed {
constexpr uint8_t kSendRecvSlotPrefix = 0x08;
class SendRecvOptions {
public:
explicit SendRecvOptions(const std::shared_ptr<gloo::Context>& context)
: context(context), timeout(context->getTimeout()) {}
template <typename T>
void setInput(T* ptr, size_t elements) {
this->in = context->createUnboundBuffer(ptr, elements * sizeof(T));
}
template <typename T>
void setOutput(T* ptr, size_t elements) {
this->out = context->createUnboundBuffer(ptr, elements * sizeof(T));
}
void setSrc(int src) { this->src = src; }
void setDst(int dst) { this->dst = dst; }
void setTag(uint32_t tag) { this->tag = tag; }
void setTimeout(std::chrono::milliseconds timeout) {
this->timeout = timeout;
}
protected:
std::shared_ptr<gloo::Context> context;
std::unique_ptr<gloo::transport::UnboundBuffer> in;
std::unique_ptr<gloo::transport::UnboundBuffer> out;
// Rank of process to send_recv from.
int src = -1;
// Rank of process to send_recv to.
int dst = -1;
// Tag for this operation.
// Must be unique across operations executing in parallel.
uint32_t tag = 0;
// End-to-end timeout for this operation.
std::chrono::milliseconds timeout;
friend void send_recv(SendRecvOptions*);
};
void send_recv(SendRecvOptions* opts);
} // namespace distributed
} // namespace paddle
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
import random import random
import unittest import unittest
from copy import deepcopy
import numpy as np import numpy as np
...@@ -100,6 +101,30 @@ class TestProcessGroupFp32(unittest.TestCase): ...@@ -100,6 +101,30 @@ class TestProcessGroupFp32(unittest.TestCase):
assert np.array_equal(broadcast_result, tensor_y) assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok") print("test broadcast api ok")
# test send_recv
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y_1 = paddle.to_tensor(y)
tensor_y_2 = deepcopy(tensor_y_1)
send_recv_result_1 = paddle.assign(tensor_x)
send_recv_result_2 = paddle.assign(tensor_y_2)
if pg.rank() == 0:
task = pg.send(tensor_x, 1, True)
else:
task = pg.recv(tensor_y_1, 0, True)
assert np.array_equal(send_recv_result_1, tensor_y_1)
if pg.rank() == 0:
task = pg.recv(tensor_x, 1, True)
assert np.array_equal(send_recv_result_2, tensor_x)
else:
task = pg.send(tensor_y_2, 0, True)
print("test send_recv api ok")
# test barrier # test barrier
# rank 0 # rank 0
if pg.rank() == 0: if pg.rank() == 0:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册