未验证 提交 5435459a 编写于 作者: L lilong12 提交者: GitHub

add communication api for ProcessGroupGloo (#40100)

* add pg_gloo apis
上级 03eb792d
......@@ -25,6 +25,8 @@
#endif
#include <gloo/broadcast.h>
#include <gloo/reduce.h>
#include <gloo/scatter.h>
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -144,6 +146,22 @@ void set_inputs(P& opts, const std::vector<Tensor>& tensors) { // NOLINT
opts.setInputs(get_multi_data<T>(tensors), tensors[0].numel());
}
template <typename T, typename P>
void set_inputs_for_scatter(P& opts, // NOLINT
const std::vector<Tensor>& tensors, // NOLINT
int nranks) {
std::vector<T*> ret(nranks);
auto raw_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[0].impl());
T* raw_pointer = reinterpret_cast<T*>(raw_tensor->data());
size_t offset = 0;
for (int i = 0; i < nranks; i++) {
ret[i] = raw_pointer + offset;
offset += tensors[0].numel() / nranks;
}
opts.setInputs(ret, tensors[0].numel() / nranks);
}
ProcessGroupGloo::GlooTask::GlooTask(int rank,
const std::vector<Tensor>& inputs,
CommType comm_type)
......@@ -257,6 +275,182 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllReduce(
return task;
}
class BarrierGlooTask : public ProcessGroupGloo::GlooTask {
public:
BarrierGlooTask(int rank, const std::shared_ptr<gloo::Context>& context)
: ProcessGroupGloo::GlooTask(rank, std::vector<Tensor>{},
CommType::BARRIER),
_context(context) {}
void Run() override { _do_barrier(); }
private:
std::shared_ptr<gloo::Context> _context;
void _do_barrier() {
gloo::BarrierOptions opts(_context);
gloo::barrier(opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Barrier(
const BarrierOptions& opts) {
std::shared_ptr<BarrierGlooTask> task;
auto context = get_context();
task = std::make_shared<BarrierGlooTask>(rank_, context);
task->Run();
return task;
}
class AllgatherGlooTask : public ProcessGroupGloo::GlooTask {
public:
AllgatherGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& inputs, // NOLINT
std::vector<Tensor>& outputs, // NOLINT
uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLGATHER),
_context(context),
_inputs(inputs),
_outputs(outputs),
_tag(tag) {}
void Run() override { _do_allgather(_inputs, _outputs); }
private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
std::vector<Tensor> _outputs;
uint32_t _tag;
void _do_allgather(std::vector<Tensor>& in, // NOLINT
std::vector<Tensor>& out) { // NOLINT
const auto& dtype = in[0].type();
gloo::AllgatherOptions opts(_context);
GENERATE_FUNC(dtype, set_input, opts, in[0]);
GENERATE_FUNC(dtype, set_output, opts, out[0]);
opts.setTag(_tag);
gloo::allgather(opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::AllGather(
std::vector<Tensor>& in_tensors, std::vector<Tensor>& out_tensors) {
std::shared_ptr<AllgatherGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<AllgatherGlooTask>(rank_, context, in_tensors,
out_tensors, tag);
task->Run();
return task;
}
class ReduceGlooTask : public ProcessGroupGloo::GlooTask {
public:
ReduceGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& in, ReduceOp reduce_op, // NOLINT
int dst, uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, in, CommType::REDUCE),
_context(context),
_inputs(in),
_reduce_op(reduce_op),
_dst(dst),
_tag(tag) {}
void Run() override { _do_reduce(_inputs, _dst); }
private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
const ReduceOp _reduce_op;
int _dst;
uint32_t _tag;
gloo::ReduceOptions::Func _get_function(const experimental::DataType type,
const ReduceOp op) {
gloo::ReduceOptions::Func fn;
GENERATE_FUNC(type, _get_function_impl, fn, op);
return fn;
}
template <typename T>
void _get_function_impl(gloo::ReduceOptions::Func& fn, // NOLINT
const ReduceOp op) {
fn = get_function<T>(op);
}
void _do_reduce(std::vector<Tensor>& tensors, int dst) { // NOLINT
const auto& dtype = tensors[0].type();
gloo::ReduceOptions opts(_context);
GENERATE_FUNC(dtype, set_input, opts, tensors[0]);
GENERATE_FUNC(dtype, set_output, opts, tensors[0]);
opts.setReduceFunction(_get_function(dtype, _reduce_op));
opts.setTag(_tag);
opts.setRoot(dst);
gloo::reduce(opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
std::vector<Tensor>& tensors, const ReduceOptions& opts) {
std::shared_ptr<ReduceGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<ReduceGlooTask>(rank_, context, tensors,
opts.reduce_op, opts.root_rank, tag);
task->Run();
return task;
}
class ScatterGlooTask : public ProcessGroupGloo::GlooTask {
public:
ScatterGlooTask(int rank, const std::shared_ptr<gloo::Context>& context,
std::vector<Tensor>& inputs, // NOLINT
std::vector<Tensor>& outputs, // NOLINT
int src, int size, uint32_t tag)
: ProcessGroupGloo::GlooTask(rank, inputs, CommType::SCATTER),
_context(context),
_inputs(inputs),
_outputs(outputs),
_src(src),
_size(size),
_tag(tag) {}
void Run() override { _do_scatter(_inputs, _outputs, _src); }
private:
std::shared_ptr<gloo::Context> _context;
std::vector<Tensor> _inputs;
std::vector<Tensor> _outputs;
int _src;
int _size;
uint32_t _tag;
void _do_scatter(std::vector<Tensor>& in, std::vector<Tensor>& out, // NOLINT
int src) {
const auto& dtype = in[0].type();
gloo::ScatterOptions opts(_context);
if (rank_ == src) {
GENERATE_FUNC(dtype, set_inputs_for_scatter, opts, in, _size);
}
GENERATE_FUNC(dtype, set_output, opts, out[0]);
opts.setRoot(src);
opts.setTag(_tag);
gloo::scatter(opts);
}
};
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
std::vector<Tensor>& in_tensors, std::vector<Tensor>& out_tensors,
const ScatterOptions& opts) {
std::shared_ptr<ScatterGlooTask> task;
auto tag = next_tag();
auto context = get_context();
task = std::make_shared<ScatterGlooTask>(
rank_, context, in_tensors, out_tensors, opts.root_rank, size_, tag);
task->Run();
return task;
}
std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) {
::gloo::transport::tcp::attr attr;
......
......@@ -114,6 +114,20 @@ class ProcessGroupGloo : public ProcessGroup {
std::vector<Tensor>& inputs,
const AllreduceOptions& opts = AllreduceOptions()) override;
std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;
std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<Tensor>& tensors, const ReduceOptions& opts) override;
std::shared_ptr<ProcessGroup::Task> Scatter(std::vector<Tensor>& in_tensors,
std::vector<Tensor>& out_tensors,
const ScatterOptions&) override;
std::shared_ptr<::gloo::Context> get_context() { return _context; }
uint64_t next_tag() { return _tag++; }
......
......@@ -74,6 +74,7 @@ void MasterDaemon::_do_set(SocketType socket) {
}
void MasterDaemon::_do_get(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_get";
std::string key = tcputils::receive_string(socket);
auto iter = _store.find(key);
PADDLE_ENFORCE_NE(
......@@ -86,13 +87,14 @@ void MasterDaemon::_do_get(SocketType socket) {
void MasterDaemon::_do_stop(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_stop";
ReplyType value = ReplyType::STOP_WAIT;
tcputils::send_value<ReplyType>(socket, value);
if (--_nranks == 0) {
_stop = true;
}
tcputils::send_value<ReplyType>(socket, value);
}
void MasterDaemon::_do_wait(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_wait";
std::string key = tcputils::receive_string(socket);
auto iter = _store.find(key);
auto reply = ReplyType::STOP_WAIT;
......@@ -134,32 +136,42 @@ void MasterDaemon::run() {
}
for (size_t i = 1; i < fds.size(); i++) {
if (fds[i].revents == 0) {
continue;
}
Command command = tcputils::receive_value<Command>(fds[i].fd);
VLOG(3) << "TCPStore: recv command: " << static_cast<int>(command) << ".";
switch (command) {
case Command::ADD:
_do_add(fds[i].fd);
break;
case Command::GET:
_do_get(fds[i].fd);
break;
case Command::SET:
_do_set(fds[i].fd);
break;
case Command::WAIT:
_do_wait(fds[i].fd);
break;
case Command::STOP:
_do_stop(fds[i].fd);
break;
default:
VLOG(0) << "Unknow command: " << static_cast<int>(command);
exit(-1);
VLOG(0) << "fds.size:" << fds.size();
VLOG(0) << "fds.size-i:" << i;
VLOG(0) << "fds[i].revents:" << fds[i].revents;
try {
if (fds[i].revents == 0) {
continue;
}
Command command = tcputils::receive_value<Command>(fds[i].fd);
VLOG(3) << "TCPStore: recv command: " << static_cast<int>(command)
<< ".";
switch (command) {
case Command::ADD:
_do_add(fds[i].fd);
break;
case Command::GET:
_do_get(fds[i].fd);
break;
case Command::SET:
_do_set(fds[i].fd);
break;
case Command::WAIT:
_do_wait(fds[i].fd);
break;
case Command::STOP:
_do_stop(fds[i].fd);
break;
default:
VLOG(0) << "Unknow command: " << static_cast<int>(command);
exit(-1);
}
} catch (...) {
fds.erase(fds.begin() + i);
_sockets.erase(_sockets.begin() + i - 1);
}
}
}
......@@ -281,8 +293,8 @@ void TCPStore::wait(const std::string& key) {
}
TCPStore::~TCPStore() {
_client->send_command_for_key(Command::STOP, "");
VLOG(3) << "~TCPStore";
_client->send_command_for_key(Command::STOP, "");
ReplyType ret = _client->receive_value<ReplyType>();
PADDLE_ENFORCE_EQ(ret, ReplyType::STOP_WAIT,
platform::errors::InvalidArgument(
......
......@@ -104,16 +104,91 @@ class TestProcessGroupFp32(unittest.TestCase):
broadcast_result = paddle.assign(tensor_x)
if rank == 0:
task = pg.broadcast(tensor_x, 0)
task.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0)
task.synchronize()
assert task.is_completed()
assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier()
task.wait()
# rank 1
else:
task = pg.barrier()
task.wait()
print("test barrier api ok\n")
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = pg.all_gather(tensor_y, tensor_out)
task.wait()
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2],
[out_shape[0]])
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
print("test allgather api ok\n")
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task.wait()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test Scatter
# rank 0
in_shape = list(self.shape)
in_shape[0] *= 2
x = np.random.random(in_shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
out2 = paddle.slice(tensor_x, [0], [self.shape[0]],
[self.shape[0] * 2])
if pg.rank() == 0:
assert np.array_equal(tensor_y, out1)
else:
assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册