// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "paddle/fluid/pybind/bind_fleet_executor.h" #include #include #include #include #include "paddle/fluid/distributed/fleet_executor/dist_model.h" #include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/place.h" namespace py = pybind11; namespace paddle { namespace pybind { using paddle::distributed::FleetExecutor; using paddle::distributed::TaskNode; using paddle::distributed::DistModelConfig; using paddle::distributed::DistModel; using paddle::distributed::DistModelDataBuf; using paddle::distributed::DistModelTensor; using paddle::distributed::DistModelDataType; using paddle::framework::OpDesc; using paddle::framework::ProgramDesc; template DistModelDataBuf DistModelDataBufCreate( py::array_t data) { // accept numpy array directly DistModelDataBuf buf(data.size() * sizeof(T)); std::copy_n(static_cast(data.data()), data.size(), static_cast(buf.data())); return buf; } template void DistModelDataBufReset( DistModelDataBuf& buf, // NOLINT py::array_t data) { // NOLINT // reset the data with numpy array directly buf.Resize(data.size() * sizeof(T)); std::copy_n(static_cast(data.data()), data.size(), static_cast(buf.data())); } template DistModelTensor DistModelTensorCreate( py::array_t data, const std::string name, const std::vector>& lod, bool copy) { DistModelTensor tensor; if (copy) { DistModelDataBuf buf(data.size() * sizeof(T)); std::copy_n(static_cast(data.data()), data.size(), static_cast(buf.data())); tensor.data = std::move(buf); } else { tensor.data = DistModelDataBuf(data.mutable_data(), data.size() * sizeof(T)); } tensor.dtype = paddle::distributed::DistModelGetDtype(); tensor.name = name; tensor.lod = lod; tensor.shape.resize(data.ndim()); std::copy_n(data.shape(), data.ndim(), tensor.shape.begin()); return tensor; } py::dtype DistModelTypeToNumpyDType(DistModelDataType dtype) { py::dtype dt; switch (dtype) { case DistModelDataType::INT32: dt = py::dtype::of(); break; case DistModelDataType::INT64: dt = py::dtype::of(); break; case DistModelDataType::FLOAT32: dt = py::dtype::of(); break; case DistModelDataType::INT8: dt = py::dtype::of(); break; case DistModelDataType::FLOAT16: dt = py::dtype::of(); break; default: PADDLE_THROW(platform::errors::Unimplemented( "Unsupported data type. Now only supports INT32, INT64, INT8, " "FLOAT16 and FLOAT32.")); } return dt; } py::array DistModelTensorGetData(DistModelTensor& tensor) { // NOLINT py::dtype dt = DistModelTypeToNumpyDType(tensor.dtype); return py::array(std::move(dt), {tensor.shape}, tensor.data.data()); } void BindFleetExecutor(py::module* m) { py::class_(*m, "FleetExecutor") .def(py::init()) .def("init", &FleetExecutor::Init) .def("run", &FleetExecutor::Run, py::call_guard()); py::class_(*m, "TaskNode") .def(py::init()) .def(py::init&, int64_t, int64_t, int64_t, int64_t>()) .def("task_id", &TaskNode::task_id) .def("add_upstream_task", &TaskNode::AddUpstreamTask) .def("add_downstream_task", &TaskNode::AddDownstreamTask) .def("set_run_pre_steps", &TaskNode::SetRunPerSteps) .def("set_run_at_offset", &TaskNode::SetRunAtOffset) .def("set_type", &TaskNode::SetType) .def("role", &TaskNode::role) .def("init", &TaskNode::Init) .def("set_program", &TaskNode::SetProgram); py::class_(*m, "DistModelConfig") .def(py::init<>()) .def_readwrite("model_dir", &DistModelConfig::model_dir) .def_readwrite("program_desc", &DistModelConfig::program_desc) .def_readwrite("scope", &DistModelConfig::scope) .def_readwrite("place", &DistModelConfig::place) .def_readwrite("device_id", &DistModelConfig::device_id) .def_readwrite("trainer_endpoints", &DistModelConfig::trainer_endpoints) .def_readwrite("current_endpoint", &DistModelConfig::current_endpoint) .def_readwrite("nranks", &DistModelConfig::nranks) .def_readwrite("local_rank", &DistModelConfig::local_rank) .def_readwrite("mp_degree", &DistModelConfig::mp_degree) .def_readwrite("pp_degree", &DistModelConfig::pp_degree) .def_readwrite("mp_ring_id", &DistModelConfig::mp_ring_id) .def_readwrite("enable_timer", &DistModelConfig::enable_timer) .def_readwrite("pp_upstream_ring_id", &DistModelConfig::pp_upstream_ring_id) .def_readwrite("pp_downstream_ring_id", &DistModelConfig::pp_downstream_ring_id); py::class_(*m, "DistModel") .def(py::init()) .def("init", &DistModel::Init) .def("run", [](DistModel& self, const std::vector& inputs) { std::vector outputs; self.Run(inputs, &outputs); return outputs; }); py::class_(*m, "DistModelDataBuf") .def(py::init()) .def(py::init([](std::vector& data) { auto buf = DistModelDataBuf(data.size() * sizeof(float)); std::memcpy(buf.data(), static_cast(data.data()), buf.length()); return buf; })) .def(py::init(&DistModelDataBufCreate)) .def(py::init(&DistModelDataBufCreate)) .def(py::init(&DistModelDataBufCreate)) .def("reset", [](DistModelDataBuf& self, std::vector& data) { self.Resize(data.size() * sizeof(float)); std::memcpy(self.data(), data.data(), self.length()); }) .def("reset", &DistModelDataBufReset) .def("reset", &DistModelDataBufReset) .def("reset", &DistModelDataBufReset) .def("length", &DistModelDataBuf::length) .def("tolist", [](DistModelDataBuf& self, const std::string& dtype) -> py::list { py::list l; if (dtype == "int32") { auto* data = static_cast(self.data()); auto size = self.length() / sizeof(int32_t); l = py::cast(std::vector(data, data + size)); } else if (dtype == "int64") { auto* data = static_cast(self.data()); auto size = self.length() / sizeof(int64_t); l = py::cast(std::vector(data, data + size)); } else if (dtype == "float32") { auto* data = static_cast(self.data()); auto size = self.length() / sizeof(float); l = py::cast(std::vector(data, data + size)); } else { PADDLE_THROW(platform::errors::Unimplemented( "Unsupported data type. Now only supports INT32, INT64 and " "FLOAT32.")); } return l; }); py::class_(*m, "DistModelTensor") .def(py::init<>()) .def(py::init(&DistModelTensorCreate), py::arg("data"), py::arg("name") = "", py::arg("lod") = std::vector>(), py::arg("copy") = true) .def(py::init(&DistModelTensorCreate), py::arg("data"), py::arg("name") = "", py::arg("lod") = std::vector>(), py::arg("copy") = true) .def(py::init(&DistModelTensorCreate), py::arg("data"), py::arg("name") = "", py::arg("lod") = std::vector>(), py::arg("copy") = true) .def_readwrite("name", &DistModelTensor::name) .def_readwrite("shape", &DistModelTensor::shape) .def_readwrite("data", &DistModelTensor::data) .def_readwrite("dtype", &DistModelTensor::dtype) .def_readwrite("lod", &DistModelTensor::lod) .def("as_ndarray", &DistModelTensorGetData); py::enum_(*m, "DistModelDataType") .value("FLOAT32", DistModelDataType::FLOAT32) .value("INT64", DistModelDataType::INT64) .value("INT32", DistModelDataType::INT32); } } // namespace pybind } // namespace paddle