/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include #ifdef _POSIX_C_SOURCE #undef _POSIX_C_SOURCE #endif #ifdef _XOPEN_SOURCE #undef _XOPEN_SOURCE #endif #include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/reducer.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/pybind/distributed_py.h" #include "paddle/fluid/pybind/eager_utils.h" #include "paddle/phi/api/all.h" #if defined(PADDLE_WITH_NCCL) #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #endif #if defined(PADDLE_WITH_GLOO) #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/distributed/store/tcp_store.h" #endif namespace py = pybind11; namespace paddle { namespace pybind { using Tensor = paddle::experimental::Tensor; #if defined(PADDLE_WITH_GLOO) using ProcessGroupGloo = paddle::distributed::ProcessGroupGloo; using GlooStore = paddle::distributed::ProcessGroupGloo::GlooStore; using GlooOptions = paddle::distributed::ProcessGroupGloo::GlooOptions; #endif static std::string GLOO_SOCKET_IFNAME_ENV = "GLOO_SOCKET_IFNAME"; // NOLINT void BindDistributed(py::module *m) { py::enum_(*m, "ReduceOp") .value("SUM", distributed::ReduceOp::SUM) .value("AVG", distributed::ReduceOp::AVG) .value("MAX", distributed::ReduceOp::MAX) .value("MIN", distributed::ReduceOp::MIN) .value("PRODUCT", distributed::ReduceOp::PRODUCT); py::class_(*m, "AllreduceOptions") .def(py::init<>()) .def_readwrite("reduce_op", &distributed::AllreduceOptions::reduce_op); py::class_(*m, "BroadcastOptions") .def(py::init<>()) .def_readwrite("source_rank", &distributed::BroadcastOptions::source_rank) .def_readwrite("source_root", &distributed::BroadcastOptions::source_root); py::class_(*m, "BarrierOptions") .def(py::init<>()) .def_readwrite("place_ids", &distributed::BarrierOptions::place_ids); py::class_(*m, "ReduceOptions") .def(py::init<>()) .def_readwrite("reduce_op", &distributed::ReduceOptions::reduce_op) .def_readwrite("source_root", &distributed::ReduceOptions::root_rank); auto ProcessGroup = py::class_>(*m, "ProcessGroup") .def("rank", &distributed::ProcessGroup::GetRank) .def("size", &distributed::ProcessGroup::GetSize) .def("name", &distributed::ProcessGroup::GetBackendName) .def("allreduce", [](distributed::ProcessGroup &self, py::handle py_tensor, distributed::ReduceOp op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); distributed::AllreduceOptions opts; opts.reduce_op = op; std::vector tensors = {tensor}; return self.AllReduce(tensors, opts); }, py::arg("tensor"), py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) .def("broadcast", [](distributed::ProcessGroup &self, py::handle py_tensor, int source_rank) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); distributed::BroadcastOptions opts; opts.source_rank = source_rank; std::vector tensors = {tensor}; return self.Broadcast(tensors, opts); }, py::arg("tensor"), py::arg("source_rank"), py::call_guard()) .def("barrier", [](distributed::ProcessGroup &self, std::vector place_ids) { distributed::BarrierOptions opts; opts.place_ids = place_ids; return self.Barrier(opts); }, py::arg("place_ids") = std::vector{}, py::call_guard()) .def("send", [](distributed::ProcessGroup &self, py::handle py_tensor, int dst) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); std::vector tensors = {tensor}; return self.Send(tensors, dst); }, py::arg("tensor"), py::arg("dst"), py::call_guard()) .def("recv", [](distributed::ProcessGroup &self, py::handle py_tensor, int src) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); std::vector tensors = {tensor}; return self.Recv(tensors, src); }, py::arg("tensor"), py::arg("src"), py::call_guard()) .def("all_gather", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); std::vector in_tensors = {in_tensor}; std::vector out_tensors = {out_tensor}; return self.AllGather(in_tensors, out_tensors); }, py::arg("in"), py::arg("out"), py::call_guard()) .def("alltoall", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); std::vector in_tensors = {in_tensor}; std::vector out_tensors = {out_tensor}; return self.AllToAll(in_tensors, out_tensors); }, py::arg("in"), py::arg("out"), py::call_guard()) .def("reduce", [](distributed::ProcessGroup &self, py::handle py_in_tensor, int dst, distributed::ReduceOp op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); distributed::ReduceOptions opts; opts.reduce_op = op; opts.root_rank = dst; std::vector tensors = {in_tensor}; return self.Reduce(tensors, opts); }, py::arg("tensor"), py::arg("dst"), py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) .def("scatter", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, int src) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); distributed::ScatterOptions opts; opts.root_rank = src; std::vector in_tensors = {in_tensor}; std::vector out_tensors = {out_tensor}; return self.Scatter(in_tensors, out_tensors, opts); }, py::arg("in"), py::arg("out"), py::arg("src"), py::call_guard()); #if defined(PADDLE_WITH_NCCL) py::class_>( *m, "ProcessGroupNCCL", ProcessGroup) .def(py::init(), py::call_guard()); #endif py::class_>(*m, "task") .def("is_completed", &distributed::ProcessGroup::Task::IsCompleted) .def("wait", &distributed::ProcessGroup::Task::Wait, py::arg("timeout") = kWaitTimeout, py::call_guard()) .def("synchronize", &distributed::ProcessGroup::Task::Synchronize, py::call_guard()); // define parallel strategy, it will be removed py::class_ pg_strategy( *m, "ProcessGroupStrategy", ""); pg_strategy.def(py::init()) .def_property("nranks", [](const distributed::ProcessGroupStrategy &self) { return self.nranks_; }, [](distributed::ProcessGroupStrategy &self, int nranks) { self.nranks_ = nranks; }) .def_property("local_rank", [](const distributed::ProcessGroupStrategy &self) { return self.local_rank_; }, [](distributed::ProcessGroupStrategy &self, int local_rank) { self.local_rank_ = local_rank; }) .def_property( "trainer_endpoints", [](const distributed::ProcessGroupStrategy &self) { return self.trainer_endpoints_; }, [](distributed::ProcessGroupStrategy &self, std::vector eps) { self.trainer_endpoints_ = eps; }) .def_property("current_endpoint", [](const distributed::ProcessGroupStrategy &self) { return self.current_endpoint_; }, [](distributed::ProcessGroupStrategy &self, const std::string &ep) { self.current_endpoint_ = ep; }) .def_property("nrings", [](const distributed::ProcessGroupStrategy &self) { return self.nrings_; }, [](distributed::ProcessGroupStrategy &self, int nrings) { self.nrings_ = nrings; }); #if defined(PADDLE_WITH_GLOO) py::class_(*m, "GlooOptions") .def(py::init<>()) .def_readwrite("_device", &GlooOptions::device) .def_static("create", &GlooOptions::create); py::class_>(*m, "GlooStore") .def(py::init( [](const std::shared_ptr &store) { return std::make_shared(store); }), py::call_guard()); py::class_>( *m, "ProcessGroupGloo", ProcessGroup) .def(py::init &, int, int, std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, int rank, int world_size) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); if (ifname && strlen(ifname) > 1) { opts->device = ProcessGroupGloo::createDeviceForInterface( std::string(ifname)); } else { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared(store, rank, world_size, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), // py::arg("timeout") = // kProcessGroupDefaultTimeout, py::call_guard()) .def_static("create_default_device", &ProcessGroupGloo::createDefaultDevice); #endif m->def("eager_assign_group_by_size", [](py::handle py_tensors, std::vector is_sparse_gradient, std::vector group_size_limits, std::vector tensor_indices) { auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); return distributed::Eager_AssignGroupBySize( tensors, is_sparse_gradient, group_size_limits, tensor_indices); }, py::arg("tensors"), py::arg("is_sparse_gradient"), py::arg("group_size_limits") = std::vector{25 * 1024 * 1024}, py::arg("tensor_indices") = std::vector{}, py::call_guard()); } } // end namespace pybind } // namespace paddle