From 08793179a215f72ae26fa395c065a32d39315a36 Mon Sep 17 00:00:00 2001 From: Yuang Liu Date: Fri, 21 Jan 2022 07:48:37 +0800 Subject: [PATCH] [fleet executor] add a tensor wrapper to support python numpy input (#39076) --- .../distributed/fleet_executor/CMakeLists.txt | 4 +- .../distributed/fleet_executor/dist_model.cc | 4 +- .../distributed/fleet_executor/dist_model.h | 5 +- .../dist_model_tensor_wrapper.cc | 100 ++++++++++++ .../dist_model_tensor_wrapper.h | 77 +++++++++ paddle/fluid/pybind/bind_fleet_executor.cc | 153 ++++++++++++++++++ .../fluid/tests/unittests/CMakeLists.txt | 1 + .../tests/unittests/test_dist_model_tensor.py | 63 ++++++++ 8 files changed, 401 insertions(+), 6 deletions(-) create mode 100644 paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.cc create mode 100644 paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h create mode 100644 python/paddle/fluid/tests/unittests/test_dist_model_tensor.py diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 1e31187367b..3e734b1b9ed 100644 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -12,8 +12,8 @@ endif() cc_library(task_loop_thread_pool SRCS task_loop_thread_pool.cc task_loop_thread.cc task_loop.cc DEPS enforce glog) -cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc - interceptor.cc compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc +cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc task_node.cc runtime_graph.cc dist_model.cc interceptor.cc + compute_interceptor.cc amplifier_interceptor.cc message_service.cc message_bus.cc dist_model_tensor_wrapper.cc DEPS proto_desc fleet_executor_desc_proto interceptor_message_proto task_loop_thread_pool collective_helper op_registry executor_gc_helper gflags glog ${BRPC_DEPS}) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 310c809de71..6454a349505 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -355,8 +355,8 @@ bool DistModel::PrepareFeedAndFetch() { return true; } -void DistModel::Run(const std::vector &input_data, - std::vector *output_data) { +void DistModel::Run(const std::vector &input_data, + std::vector *output_data) { /* TODO(fleet exe dev): implement this funct */ } diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.h b/paddle/fluid/distributed/fleet_executor/dist_model.h index d6dc554f158..96e9c018074 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.h +++ b/paddle/fluid/distributed/fleet_executor/dist_model.h @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/macros.h" @@ -56,8 +57,8 @@ class DistModel { public: explicit DistModel(const DistModelConfig& config) : config_(config) {} bool Init(); - void Run(const std::vector& input_data, - std::vector* output_data); + void Run(const std::vector& input_data, + std::vector* output_data); ~DistModel() = default; private: diff --git a/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.cc b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.cc new file mode 100644 index 00000000000..b440d39c73a --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace distributed { + +void DistModelDataBuf::Reset(void* data, size_t length) { + Free(); + memory_owned_ = false; + data_ = data; + length_ = length; +} + +void DistModelDataBuf::Free() { + if (memory_owned_ && data_) { + PADDLE_ENFORCE_GT(length_, 0UL, + platform::errors::PreconditionNotMet( + "Error occurred when deconstruct DistModelDataBuf: " + "it contains no data!")); + // NOTE: if own the memory, it must be char* type + delete[] static_cast(data_); + data_ = nullptr; + length_ = 0; + } +} + +void DistModelDataBuf::Resize(size_t length) { + if (length_ >= length) { + return; + } + if (memory_owned_) { + Free(); + data_ = new char[length]; + length_ = length; + memory_owned_ = true; + } else { + PADDLE_THROW(platform::errors::PreconditionNotMet( + "The memory is allocated externally, can not Resized")); + } +} + +DistModelDataBuf& DistModelDataBuf::operator=(const DistModelDataBuf& other) { + if (!other.memory_owned_) { + data_ = other.data_; + length_ = other.length_; + memory_owned_ = other.memory_owned_; + } else { + Resize(other.length_); + if (other.length() && other.data()) { + std::memcpy(data_, other.data(), other.length()); + } else if (other.length()) { + PADDLE_THROW(platform::errors::InvalidArgument( + "Invalid argument, null pointer data with length %u is passed", + other.length())); + } + length_ = other.length_; + memory_owned_ = true; + } + return *this; +} + +DistModelDataBuf& DistModelDataBuf::operator=(DistModelDataBuf&& other) { + data_ = other.data_; + memory_owned_ = other.memory_owned_; + length_ = other.length_; + other.data_ = nullptr; + other.length_ = 0; + other.memory_owned_ = false; + return *this; +} + +DistModelDataBuf::DistModelDataBuf(DistModelDataBuf&& other) + : data_(other.data_), + length_(other.length_), + memory_owned_(other.memory_owned_) { + other.memory_owned_ = false; + other.data_ = nullptr; + other.length_ = 0; +} + +DistModelDataBuf::DistModelDataBuf(const DistModelDataBuf& other) { + *this = other; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h new file mode 100644 index 00000000000..4a04633388a --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h @@ -0,0 +1,77 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace distributed { + +enum DistModelDataType { FLOAT16, FLOAT32, INT64, INT32, INT8 }; + +template +constexpr DistModelDataType DistModelGetDtype(); + +template <> +constexpr DistModelDataType DistModelGetDtype() { + return DistModelDataType::INT32; +} + +template <> +constexpr DistModelDataType DistModelGetDtype() { + return DistModelDataType::INT64; +} + +template <> +constexpr DistModelDataType DistModelGetDtype() { + return DistModelDataType::FLOAT32; +} + +class DistModelDataBuf { + public: + explicit DistModelDataBuf(size_t length) + : data_(new char[length]), length_(length), memory_owned_(true) {} + DistModelDataBuf(void* data, size_t length) + : data_(data), length_(length), memory_owned_(false) {} + void Reset(void* data, size_t length); + size_t length() const { return length_; } + void* data() const { return data_; } + ~DistModelDataBuf() { Free(); } + DistModelDataBuf() = default; + void Resize(size_t length); + + DistModelDataBuf& operator=(const DistModelDataBuf& other); + DistModelDataBuf& operator=(DistModelDataBuf&& other); + DistModelDataBuf(DistModelDataBuf&& other); + DistModelDataBuf(const DistModelDataBuf& other); + + private: + void Free(); + void* data_{nullptr}; + size_t length_{0}; + bool memory_owned_{false}; +}; + +struct DistModelTensor { + std::string name; + std::vector shape; + DistModelDataBuf data; + DistModelDataType dtype; + std::vector> lod; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/pybind/bind_fleet_executor.cc b/paddle/fluid/pybind/bind_fleet_executor.cc index 08f8aec5288..450939dd0ff 100644 --- a/paddle/fluid/pybind/bind_fleet_executor.cc +++ b/paddle/fluid/pybind/bind_fleet_executor.cc @@ -13,8 +13,12 @@ // limitations under the License. #include "paddle/fluid/pybind/bind_fleet_executor.h" +#include #include +#include +#include #include "paddle/fluid/distributed/fleet_executor/dist_model.h" +#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/operator.h" @@ -31,9 +35,90 @@ using paddle::distributed::FleetExecutor; using paddle::distributed::TaskNode; using paddle::distributed::DistModelConfig; using paddle::distributed::DistModel; +using paddle::distributed::DistModelDataBuf; +using paddle::distributed::DistModelTensor; +using paddle::distributed::DistModelDataType; using paddle::framework::OpDesc; using paddle::framework::ProgramDesc; +template +DistModelDataBuf DistModelDataBufCreate( + py::array_t data) { + // accept numpy array directly + DistModelDataBuf buf(data.size() * sizeof(T)); + std::copy_n(static_cast(data.data()), data.size(), + static_cast(buf.data())); + return buf; +} + +template +void DistModelDataBufReset( + DistModelDataBuf& buf, // NOLINT + py::array_t data) { // NOLINT + // reset the data with numpy array directly + buf.Resize(data.size() * sizeof(T)); + std::copy_n(static_cast(data.data()), data.size(), + static_cast(buf.data())); +} + +template +DistModelTensor DistModelTensorCreate( + py::array_t data, + const std::string name, const std::vector>& lod, + bool copy) { + DistModelTensor tensor; + + if (copy) { + DistModelDataBuf buf(data.size() * sizeof(T)); + std::copy_n(static_cast(data.data()), data.size(), + static_cast(buf.data())); + tensor.data = std::move(buf); + } else { + tensor.data = + DistModelDataBuf(data.mutable_data(), data.size() * sizeof(T)); + } + + tensor.dtype = paddle::distributed::DistModelGetDtype(); + tensor.name = name; + tensor.lod = lod; + tensor.shape.resize(data.ndim()); + std::copy_n(data.shape(), data.ndim(), tensor.shape.begin()); + + return tensor; +} + +py::dtype DistModelTypeToNumpyDType(DistModelDataType dtype) { + py::dtype dt; + switch (dtype) { + case DistModelDataType::INT32: + dt = py::dtype::of(); + break; + case DistModelDataType::INT64: + dt = py::dtype::of(); + break; + case DistModelDataType::FLOAT32: + dt = py::dtype::of(); + break; + case DistModelDataType::INT8: + dt = py::dtype::of(); + break; + case DistModelDataType::FLOAT16: + dt = py::dtype::of(); + break; + default: + PADDLE_THROW(platform::errors::Unimplemented( + "Unsupported data type. Now only supports INT32, INT64, INT8, " + "FLOAT16 and FLOAT32.")); + } + + return dt; +} + +py::array DistModelTensorGetData(DistModelTensor& tensor) { // NOLINT + py::dtype dt = DistModelTypeToNumpyDType(tensor.dtype); + return py::array(std::move(dt), {tensor.shape}, tensor.data.data()); +} + void BindFleetExecutor(py::module* m) { py::class_(*m, "FleetExecutor") .def(py::init()) @@ -78,6 +163,74 @@ void BindFleetExecutor(py::module* m) { .def(py::init()) .def("init", &DistModel::Init) .def("run", &DistModel::Run, py::call_guard()); + + py::class_(*m, "DistModelDataBuf") + .def(py::init()) + .def(py::init([](std::vector& data) { + auto buf = DistModelDataBuf(data.size() * sizeof(float)); + std::memcpy(buf.data(), static_cast(data.data()), buf.length()); + return buf; + })) + .def(py::init(&DistModelDataBufCreate)) + .def(py::init(&DistModelDataBufCreate)) + .def(py::init(&DistModelDataBufCreate)) + .def("reset", + [](DistModelDataBuf& self, std::vector& data) { + self.Resize(data.size() * sizeof(float)); + std::memcpy(self.data(), data.data(), self.length()); + }) + .def("reset", &DistModelDataBufReset) + .def("reset", &DistModelDataBufReset) + .def("reset", &DistModelDataBufReset) + .def("length", &DistModelDataBuf::length) + .def("tolist", + [](DistModelDataBuf& self, const std::string& dtype) -> py::list { + py::list l; + if (dtype == "int32") { + auto* data = static_cast(self.data()); + auto size = self.length() / sizeof(int32_t); + l = py::cast(std::vector(data, data + size)); + } else if (dtype == "int64") { + auto* data = static_cast(self.data()); + auto size = self.length() / sizeof(int64_t); + l = py::cast(std::vector(data, data + size)); + } else if (dtype == "float32") { + auto* data = static_cast(self.data()); + auto size = self.length() / sizeof(float); + l = py::cast(std::vector(data, data + size)); + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "Unsupported data type. Now only supports INT32, INT64 and " + "FLOAT32.")); + } + return l; + }); + + py::class_(*m, "DistModelTensor") + .def(py::init<>()) + .def(py::init(&DistModelTensorCreate), py::arg("data"), + py::arg("name") = "", + py::arg("lod") = std::vector>(), + py::arg("copy") = true) + .def(py::init(&DistModelTensorCreate), py::arg("data"), + py::arg("name") = "", + py::arg("lod") = std::vector>(), + py::arg("copy") = true) + .def(py::init(&DistModelTensorCreate), py::arg("data"), + py::arg("name") = "", + py::arg("lod") = std::vector>(), + py::arg("copy") = true) + .def_readwrite("name", &DistModelTensor::name) + .def_readwrite("shape", &DistModelTensor::shape) + .def_readwrite("data", &DistModelTensor::data) + .def_readwrite("dtype", &DistModelTensor::dtype) + .def_readwrite("lod", &DistModelTensor::lod) + .def("as_ndarray", &DistModelTensorGetData); + + py::enum_(*m, "DistModelDataType") + .value("FLOAT32", DistModelDataType::FLOAT32) + .value("INT64", DistModelDataType::INT64) + .value("INT32", DistModelDataType::INT32); } } // namespace pybind } // namespace paddle diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 915af18a570..5c57d1a21bc 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -152,6 +152,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_origin_scheduler) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_mapper) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_task_node) + LIST(REMOVE_ITEM TEST_OPS test_dist_model_tensor) endif() # Temporally disable test_deprecated_decorator diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py b/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py new file mode 100644 index 00000000000..da25550c4f4 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py @@ -0,0 +1,63 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import numpy as np +from paddle.fluid.core import DistModelTensor +from paddle.fluid.core import DistModelDataType + +paddle.enable_static() + + +class TestDistModelTensor(unittest.TestCase): + def test_dist_model_tensor(self): + tensor_32 = np.random.randint(10, 20, size=[20, 2]).astype('int32') + dist_tensor32 = DistModelTensor(tensor_32, '32_tensor') + self.assertEqual(dist_tensor32.dtype, DistModelDataType.INT32) + self.assertEqual( + dist_tensor32.data.tolist('int32'), tensor_32.ravel().tolist()) + # the length is how many byte the data contains + self.assertEqual(dist_tensor32.data.length(), 40 * 4) + self.assertEqual(dist_tensor32.name, '32_tensor') + dist_tensor32.data.reset(tensor_32) + self.assertEqual(dist_tensor32.as_ndarray().ravel().tolist(), + tensor_32.ravel().tolist()) + + tensor_64 = np.random.randint(10, 20, size=[20, 2]).astype('int64') + dist_tensor64 = DistModelTensor(tensor_64, '64_tensor') + self.assertEqual(dist_tensor64.dtype, DistModelDataType.INT64) + self.assertEqual( + dist_tensor64.data.tolist('int64'), tensor_64.ravel().tolist()) + self.assertEqual(dist_tensor64.data.length(), 40 * 8) + self.assertEqual(dist_tensor64.name, '64_tensor') + dist_tensor64.data.reset(tensor_64) + self.assertEqual(dist_tensor64.as_ndarray().ravel().tolist(), + tensor_64.ravel().tolist()) + + tensor_float = np.random.randn(20, 2).astype('float32') + dist_tensor_float = DistModelTensor(tensor_float, 'float_tensor') + self.assertEqual(dist_tensor_float.dtype, DistModelDataType.FLOAT32) + self.assertEqual( + dist_tensor_float.data.tolist('float32'), + tensor_float.ravel().tolist()) + self.assertEqual(dist_tensor_float.data.length(), 40 * 4) + self.assertEqual(dist_tensor_float.name, 'float_tensor') + dist_tensor_float.data.reset(tensor_float) + self.assertEqual(dist_tensor_float.as_ndarray().ravel().tolist(), + tensor_float.ravel().tolist()) + + +if __name__ == '__main__': + unittest.main() -- GitLab