From e553f758163e61dbea7acca5c9c78b2fa1ee701c Mon Sep 17 00:00:00 2001 From: Zhong Hui Date: Mon, 14 Mar 2022 12:05:30 +0800 Subject: [PATCH] [multiprocessing] Add paddle.incubate.multiprocessing for sharing tensors between python processes. (#37302) * Add support for paddle.multiprocessing * move multiprocessing to incubate. --- paddle/fluid/memory/allocation/CMakeLists.txt | 3 + .../memory/allocation/cuda_ipc_allocator.cc | 80 +++++ .../memory/allocation/cuda_ipc_allocator.h | 56 ++++ .../fluid/memory/allocation/mmap_allocator.cc | 187 ++++++++++-- .../fluid/memory/allocation/mmap_allocator.h | 69 ++++- paddle/fluid/pybind/CMakeLists.txt | 3 + paddle/fluid/pybind/pybind.cc | 284 ++++++++++++++++++ .../fluid/tests/unittests/CMakeLists.txt | 2 + .../unittests/test_paddle_multiprocessing.py | 199 ++++++++++++ .../incubate/multiprocessing/__init__.py | 27 ++ .../incubate/multiprocessing/reductions.py | 189 ++++++++++++ 11 files changed, 1080 insertions(+), 19 deletions(-) create mode 100644 paddle/fluid/memory/allocation/cuda_ipc_allocator.cc create mode 100644 paddle/fluid/memory/allocation/cuda_ipc_allocator.h create mode 100644 python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py create mode 100644 python/paddle/incubate/multiprocessing/__init__.py create mode 100644 python/paddle/incubate/multiprocessing/reductions.py diff --git a/paddle/fluid/memory/allocation/CMakeLists.txt b/paddle/fluid/memory/allocation/CMakeLists.txt index a7a417c29a7..f296ce96d4e 100644 --- a/paddle/fluid/memory/allocation/CMakeLists.txt +++ b/paddle/fluid/memory/allocation/CMakeLists.txt @@ -131,4 +131,7 @@ cc_library(virtual_memory_auto_growth_best_fit_allocator SRCS virtual_memory_aut if(NOT WIN32) cc_library(mmap_allocator SRCS mmap_allocator.cc DEPS allocator) cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator) + if (WITH_GPU) + cc_library(cuda_ipc_allocator SRCS cuda_ipc_allocator.cc DEPS allocator) + endif() endif(NOT WIN32) diff --git a/paddle/fluid/memory/allocation/cuda_ipc_allocator.cc b/paddle/fluid/memory/allocation/cuda_ipc_allocator.cc new file mode 100644 index 00000000000..b2f24d5aed1 --- /dev/null +++ b/paddle/fluid/memory/allocation/cuda_ipc_allocator.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _WIN32 + +#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h" +#include "paddle/fluid/platform/cuda_device_guard.h" + +#include +#include +#include +#include +#include + +#include "glog/logging.h" +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace memory { +namespace allocation { + +namespace { +std::mutex ipc_mutex_; +std::unordered_map> ipc_handle_to_baseptr_; +} // namespace + +std::shared_ptr GetIpcBasePtr(std::string handle) { + std::lock_guard lock(ipc_mutex_); + + auto iter = ipc_handle_to_baseptr_.find(handle); + if (iter != ipc_handle_to_baseptr_.end()) { + auto baseptr = iter->second.lock(); + if (baseptr) return baseptr; + } + // The IpcMemHandle can only open once for the same handle, + // so here we cache it here. + void *baseptr = nullptr; + auto ipc_handle = + reinterpret_cast(handle.c_str()); + PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcOpenMemHandle( + &baseptr, *ipc_handle, cudaIpcMemLazyEnablePeerAccess)); + // Close ipc handle on the same device. + int device_id = platform::GetCurrentDeviceId(); + // Add deleter to close ipc handle. + auto sp = std::shared_ptr(baseptr, [handle, device_id](void *ptr) { + platform::CUDADeviceGuard guard(device_id); + std::lock_guard lock(ipc_mutex_); + PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcCloseMemHandle(ptr)); + ipc_handle_to_baseptr_.erase(handle); + VLOG(6) << "cudaIpcCloseMemHandle for ptr:" + << "\t" << ptr; + }); + std::weak_ptr wp = sp; + ipc_handle_to_baseptr_.insert(iter, {handle, wp}); + + return sp; +} + +CudaIpcAllocation::~CudaIpcAllocation() { + shared_ptr_.reset(); + VLOG(6) << "tensor deleted cudaIpcCloseMemHandle for ptr:" + << "\t" << this->ptr(); +} + +} // namespace allocation +} // namespace memory +} // namespace paddle + +#endif diff --git a/paddle/fluid/memory/allocation/cuda_ipc_allocator.h b/paddle/fluid/memory/allocation/cuda_ipc_allocator.h new file mode 100644 index 00000000000..52e3cf10ea7 --- /dev/null +++ b/paddle/fluid/memory/allocation/cuda_ipc_allocator.h @@ -0,0 +1,56 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _WIN32 +#pragma once + +#include +#include // NOLINT +#include +#include +#include + +#include "paddle/fluid/memory/allocation/allocator.h" +#include "paddle/fluid/platform/cuda_device_guard.h" +#include "paddle/fluid/platform/device/gpu/gpu_info.h" +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace memory { +namespace allocation { + +std::shared_ptr GetIpcBasePtr(std::string handle); + +class CudaIpcAllocation : public Allocation { + public: + explicit CudaIpcAllocation(void *ptr, size_t size, int device_id, + std::shared_ptr shared_ptr) + : Allocation(ptr, size, platform::CUDAPlace(device_id)), + device_id_(std::move(device_id)), + shared_ptr_(std::move(shared_ptr)) {} + + inline const int &device_id() const { return device_id_; } + + ~CudaIpcAllocation() override; + + private: + int device_id_; + std::shared_ptr shared_ptr_; +}; + +} // namespace allocation +} // namespace memory +} // namespace paddle + +#endif diff --git a/paddle/fluid/memory/allocation/mmap_allocator.cc b/paddle/fluid/memory/allocation/mmap_allocator.cc index acaf5d54855..25c2235cce8 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.cc +++ b/paddle/fluid/memory/allocation/mmap_allocator.cc @@ -29,6 +29,155 @@ namespace paddle { namespace memory { namespace allocation { +std::string GetIPCName() { + static std::random_device rd; + std::string handle = "/paddle_"; +#ifdef _WIN32 + handle += std::to_string(GetCurrentProcessId()); +#else + handle += std::to_string(getpid()); +#endif + handle += "_"; + handle += std::to_string(rd()); + return handle; +} + +struct CountInfo { + std::atomic refcount; +}; + +void AllocateMemoryMap(std::string filename, int flags, size_t size, + void **map_ptr_, int *fd_) { + // TODO(@ZHUI): support win32 + int file_flags = 0; + int fd = -1; + if (flags & MAPPED_SHAREDMEM) { + file_flags = O_RDWR | O_CREAT; + } else { + file_flags = O_RDONLY; + } + if (flags & MAPPED_EXCLUSIVE) { + file_flags |= O_EXCL; + } + if (flags & MAPPED_NOCREATE) { + file_flags &= ~O_CREAT; + } + + if (!(flags & MAPPED_FROMFD)) { + if (flags & MAPPED_SHAREDMEM) { + fd = shm_open(filename.c_str(), file_flags, (mode_t)0600); + PADDLE_ENFORCE_NE( + fd, -1, + platform::errors::Unavailable( + "File descriptor %s open failed, unable in read-write mode", + filename.c_str())); + VLOG(6) << "shm_open: " << filename; + } + } else { + fd = -1; + } + + PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0, + platform::errors::Unavailable( + "Fruncate a file to a specified length failed!")); + + if (flags & MAPPED_SHAREDMEM) { + *map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + } else { + *map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0); + } + + PADDLE_ENFORCE_NE(*map_ptr_, MAP_FAILED, + platform::errors::Unavailable( + "Memory map failed when create shared memory.")); + + if (flags & MAPPED_KEEPFD) { + *fd_ = fd; + } else { + PADDLE_ENFORCE_NE(::close(fd), -1, + platform::errors::Unavailable( + "Error closing memory maped file <", filename, ">")); + + *fd_ = -1; + } +} + +std::shared_ptr +AllocateRefcountedMemoryMapAllocation(std::string filename, int flags, + size_t size) { + int fd = -1; + void *base_ptr = nullptr; + AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd); + void *aliged_base_ptr = + static_cast(static_cast(base_ptr) + mmap_alignment); + return std::make_shared(aliged_base_ptr, size, + filename, flags, fd); +} + +RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation( + void *ptr, size_t size, std::string ipc_name, int fd, int flags) + : MemoryMapAllocation(ptr, size, ipc_name, fd, flags) { + // must reset base ptr first. + resetBaseptr(); + initializeRefercount(); +} + +void MemoryMapAllocation::close() { + if (closed_) { + return; + } + closed_ = true; +} + +MemoryMapAllocation::~MemoryMapAllocation() { close(); } + +void RefcountedMemoryMapAllocation::incref() { + CountInfo *info = static_cast(map_ptr_); + ++info->refcount; +} + +int RefcountedMemoryMapAllocation::decref() { + CountInfo *info = static_cast(map_ptr_); + return --info->refcount == 0; +} + +void RefcountedMemoryMapAllocation::resetBaseptr() { + map_ptr_ = + static_cast(static_cast(map_ptr_) - mmap_alignment); + map_size_ = map_size_ + mmap_alignment; +} + +void RefcountedMemoryMapAllocation::initializeRefercount() { + CountInfo *info = reinterpret_cast(map_ptr_); + + if (flags_ & MAPPED_EXCLUSIVE) { + new (&info->refcount) std::atomic(1); + } else { + info->refcount++; + } +} + +void RefcountedMemoryMapAllocation::close() { + if (closed_) { + return; + } + closed_ = true; + void *data = map_ptr_; + CountInfo *info = reinterpret_cast(data); + if (--info->refcount == 0) { + PADDLE_ENFORCE_NE( + shm_unlink(ipc_name_.c_str()), -1, + platform::errors::Unavailable( + "could not unlink the shared memory file ", ipc_name_)); + VLOG(6) << "shm_unlink file: " << ipc_name_; + } + + PADDLE_ENFORCE_NE( + munmap(map_ptr_, map_size_), -1, + platform::errors::Unavailable("could not unmap the shared memory file: ", + strerror(errno), " (", errno, ")")); +} + MemoryMapWriterAllocation::~MemoryMapWriterAllocation() { PADDLE_ENFORCE_NE( munmap(this->ptr(), this->size()), -1, @@ -44,30 +193,30 @@ MemoryMapReaderAllocation::~MemoryMapReaderAllocation() { /* Here we do not pay attention to the result of shm_unlink, because the memory mapped file may have been cleared due to the MemoryMapFdSet::Clear() */ + + // Code of DataLoader subprocess: + // + // core._array_to_share_memory_tensor(b) + // out_queue.put((idx, tensor_list, structure)) + // core._remove_tensor_list_mmap_fds(tensor_list) + + /* If the tensor in already in the send queue, the tensor will be + * deconstructed by the function. If the tensor not send yet, it + * will be cleared by MemoryMapFdSet::Clear(). + * If the `_remove_tensor_list_mmap_fds` have be interrupted, the + * tensor will be cleared by both methods. + * */ + shm_unlink(this->ipc_name().c_str()); MemoryMapFdSet::Instance().Remove(this->ipc_name()); VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name(); } -std::string GetIPCName() { - static std::random_device rd; - std::string handle = "/paddle_"; -#ifdef _WIN32 - handle += std::to_string(GetCurrentProcessId()); -#else - handle += std::to_string(getpid()); -#endif - handle += "_"; - handle += std::to_string(rd()); - return handle; -} - std::shared_ptr AllocateMemoryMapWriterAllocation( size_t size) { const std::string &ipc_name = GetIPCName(); int flags = O_RDWR | O_CREAT; - - int fd = shm_open(ipc_name.c_str(), flags, 0644); + int fd = shm_open(ipc_name.c_str(), flags, 0600); PADDLE_ENFORCE_NE( fd, -1, platform::errors::Unavailable("File descriptor %s open failed", ipc_name.c_str())); @@ -86,12 +235,14 @@ std::shared_ptr AllocateMemoryMapWriterAllocation( std::shared_ptr RebuildMemoryMapReaderAllocation( const std::string &ipc_name, size_t size) { - int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644); + int flags = O_RDWR | O_CREAT; + flags &= ~O_CREAT; + + int fd = shm_open(ipc_name.c_str(), flags, 0600); PADDLE_ENFORCE_NE( fd, -1, platform::errors::Unavailable("File descriptor %s open failed", ipc_name.c_str())); - - void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0); + void *ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); PADDLE_ENFORCE_NE(ptr, MAP_FAILED, platform::errors::Unavailable( "Memory map failed when rebuild shared memory.")); diff --git a/paddle/fluid/memory/allocation/mmap_allocator.h b/paddle/fluid/memory/allocation/mmap_allocator.h index 3f91e5c4278..4f8dbfbb51e 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.h +++ b/paddle/fluid/memory/allocation/mmap_allocator.h @@ -16,8 +16,9 @@ #ifndef _WIN32 +#include #include -#include // NOLINT +#include #include #include #include @@ -28,6 +29,72 @@ namespace paddle { namespace memory { namespace allocation { +std::string GetIPCName(); + +static constexpr int64_t mmap_alignment = 64; + +enum MappedModes { + MAPPED_SHAREDMEM = 1, + MAPPED_EXCLUSIVE = 2, + MAPPED_NOCREATE = 4, + MAPPED_KEEPFD = 8, + MAPPED_FROMFD = 16, + MAPPED_UNLINK = 32 +}; + +class MemoryMapAllocation : public Allocation { + public: + explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name) + : Allocation(ptr, size, platform::CPUPlace()), + ipc_name_(std::move(ipc_name)), + map_ptr_(ptr), + map_size_(size) {} + explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name, + int flags, int fd) + : Allocation(ptr, size, platform::CPUPlace()), + ipc_name_(std::move(ipc_name)), + fd_(fd), + flags_(flags), + map_ptr_(ptr), + map_size_(size) {} + + inline const std::string &ipc_name() const { return ipc_name_; } + + virtual void close(); + + ~MemoryMapAllocation() override; + + protected: + std::string ipc_name_; + int fd_ = -1; + int flags_ = 0; + void *map_ptr_ = nullptr; + size_t map_size_ = 0; + bool closed_ = false; +}; + +class RefcountedMemoryMapAllocation : public MemoryMapAllocation { + public: + RefcountedMemoryMapAllocation(void *ptr, size_t size, std::string ipc_name, + int flags, int fd); + + void incref(); + int decref(); + void close() override; + virtual ~RefcountedMemoryMapAllocation() { close(); } + + protected: + void initializeRefercount(); + void resetBaseptr(); +}; + +void AllocateMemoryMap(std::string filename, int flags, size_t size, + void **base_ptr_, int *fd_); + +std::shared_ptr +AllocateRefcountedMemoryMapAllocation(std::string filename, int flags, + size_t size); + class MemoryMapWriterAllocation : public Allocation { public: explicit MemoryMapWriterAllocation(void *ptr, size_t size, diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8ee22590b6d..2e901f3bffd 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -44,6 +44,9 @@ endif() if(NOT WIN32) set(PYBIND_DEPS ${PYBIND_DEPS} data_loader) set(PYBIND_DEPS ${PYBIND_DEPS} mmap_allocator) + if (WITH_GPU) + set(PYBIND_DEPS ${PYBIND_DEPS} cuda_ipc_allocator) + endif() if (WITH_NCCL OR WITH_RCCL) set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context) set(PYBIND_DEPS ${PYBIND_DEPS} heter_ccl_context) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 98880294a27..ee6dce5dc23 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -64,6 +64,9 @@ limitations under the License. */ #include "paddle/fluid/imperative/amp_auto_cast.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/memory/allocation/allocator_strategy.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h" +#endif #include "paddle/fluid/memory/allocation/mmap_allocator.h" #include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/common_infer_shape_functions.h" @@ -1187,6 +1190,287 @@ PYBIND11_MODULE(core_noavx, m) { }); #else }) +#ifdef PADDLE_WITH_CUDA + .def("_share_buffer_with", + [](framework::Tensor &self, const framework::Tensor src, + py::tuple t) { + auto *cuda_ipc_allocation = + dynamic_cast( + src.Holder().get()); + + PADDLE_ENFORCE_NOT_NULL( + cuda_ipc_allocation, + platform::errors::PreconditionNotMet( + "Tensor is not Cuda IPC shared tensor. " + "Now only Tensor shared by cuda ipc could use this " + "api.")); + + size_t size = t[0].cast(); + auto dtype = + static_cast(t[1].cast()); + auto dims = phi::make_ddim(t[2].cast>()); + auto lod_info = t[3].cast(); + auto device_id = t[4].cast(); + + auto shared_reader_holder = + std::make_shared( + cuda_ipc_allocation->ptr(), + cuda_ipc_allocation->base_ptr(), size, + platform::CUDAPlace(device_id)); + + self.ResetHolderWithType(shared_reader_holder, dtype); + self.Resize(dims); + self.set_lod(lod_info); + + VLOG(6) << "Reconstructed tensor with buffer shared!"; + }, + R"DOC( + Deserialize GPU Tensor for existed shared Cuda IPC tensor. + + Params: + tensor: Shared Cuda IPC tensor. + tuple: contrains data size, data type, + tensor dims, lod information, device index. + + )DOC") + .def("_share_cuda", + [](framework::Tensor self) { + if (!self.IsInitialized() || self.numel() == 0) + throw std::runtime_error( + "Tensor not initialized or numel is 0. could not pass " + "to shared memory. "); + + auto *holder = dynamic_cast( + self.Holder().get()); + PADDLE_ENFORCE_EQ( + platform::is_gpu_place(holder->place()), true, + platform::errors::InvalidArgument( + "Tensor is not on GPU. share_cuda only support GPU " + "Tensor, share_filename is for CPU tensor.")); + + void *base_ptr = holder->base_ptr(); + ptrdiff_t offset_bytes = reinterpret_cast(holder->ptr()) - + reinterpret_cast(base_ptr); + + cudaIpcMemHandle_t handle; + PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcGetMemHandle(&handle, base_ptr)); + + auto _handle = py::bytes(reinterpret_cast(&handle), + (py::ssize_t)CUDA_IPC_HANDLE_SIZE); + + // TODO(ZHUI): use cuda event, to avoid sync. + const auto &device_id = paddle::platform::GetCurrentDeviceId(); + auto stream = + paddle::platform::stream::get_current_stream(device_id); + stream->Synchronize(); + + int type_idx = static_cast(self.type()); + size_t data_size = + self.numel() * + framework::SizeOfType( + framework::TransToProtoVarType(self.type())); + + return py::make_tuple(_handle, (py::size_t)offset_bytes, data_size, + type_idx, vectorize(self.dims()), self.lod(), + device_id); + }, + R"DOC( + Serialize GPU Tensor by cudaIpcMemHandle. + + Returns: + tuple: contrains handle, data size, data type, + tensor dims, lod information, device index. + + Examples: + .. code-block:: python + + import paddle + tensor = paddle.ones([3,3]) + metainfo = tensor.value().get_tensor()._share_cuda() + + )DOC") + .def("_new_shared_cuda", + [](py::tuple t) { + if (t.size() != 7) + throw std::runtime_error( + "Invalid Tensor meta info for shared cuda tensor!"); + + // 1. Create a new C++ instance + framework::Tensor tensor; + + // 2. Rebuild Allocation from handle + const std::string &handle = t[0].cast(); + ptrdiff_t offset_bytes = (ptrdiff_t)t[1].cast(); + auto device_id = t[6].cast(); + auto base_ptr = memory::allocation::GetIpcBasePtr(handle); + size_t size = t[2].cast(); + void *dev = base_ptr.get(); + dev = reinterpret_cast(dev) + offset_bytes; + + auto shared_reader_holder = + std::make_shared( + dev, size, device_id, std::move(base_ptr)); + + // 3. Rebuild Tensor + tensor.ResetHolderWithType( + shared_reader_holder, + static_cast(t[3].cast())); + tensor.Resize(phi::make_ddim(t[4].cast>())); + tensor.set_lod(t[5].cast()); + + return tensor; + }, + R"DOC( + Deserialize GPU lod tensor from cudaIpcMemHandle. + + Params: + tuple: contrains handle, data size, data type, + tensor dims, lod information, device index. + + Examples: + .. code-block:: python + + import paddle + tensor = paddle.ones([3,3]) + metainfo = tensor.value().get_tensor()._share_cuda() + tensor_from_shared = paddle.to_tensor(paddle.fluid.core.LoDTensor._new_shared_cuda(metainfo)) + + )DOC") +#endif + .def("_share_filename", + [](framework::Tensor &self) { + if (!self.IsInitialized() || self.numel() == 0) + throw std::runtime_error( + "Tensor not initialized or numel is 0. could not pass to " + "shared memory. "); + + auto holder = self.Holder(); + PADDLE_ENFORCE_EQ( + platform::is_cpu_place(holder->place()) || + platform::is_cuda_pinned_place(holder->place()), + true, platform::errors::InvalidArgument( + "Tensor is not on CPU. share_filename only " + "support CPU Tensor.")); + + auto *mmap_allocation = dynamic_cast< + memory::allocation::RefcountedMemoryMapAllocation *>( + holder.get()); + // If the tensor is not shared, allocate memory map allocation. + if (mmap_allocation == nullptr) { + void *data_ptr = self.data(); + size_t data_size = + self.numel() * + framework::SizeOfType( + framework::TransToProtoVarType(self.type())); + + int flags = memory::allocation::MAPPED_SHAREDMEM | + memory::allocation::MAPPED_EXCLUSIVE; + std::string handle = memory::allocation::GetIPCName(); + auto shared_holder = + memory::allocation::AllocateRefcountedMemoryMapAllocation( + handle, flags, data_size); + + // copy data & reset holder + if (platform::is_cuda_pinned_place(holder->place())) { +#ifdef PADDLE_WITH_CUDA + memory::Copy(platform::CPUPlace(), shared_holder->ptr(), + platform::CUDAPinnedPlace(), data_ptr, data_size); +#endif + } else { + memory::Copy(platform::CPUPlace(), shared_holder->ptr(), + platform::CPUPlace(), data_ptr, data_size); + } + self.ResetHolder(shared_holder); + mmap_allocation = shared_holder.get(); + } + int type_idx = static_cast(self.type()); + + return py::make_tuple(mmap_allocation->ipc_name(), + mmap_allocation->size(), type_idx, + vectorize(self.dims()), self.lod()); + }, + R"DOC( + Serialize CPU lod tensor in shared memory to tuple. + If the tensor is not in shared memory, we will copy it first. + + Returns: + tuple: contrains ipc name, data size, data type, + tensor dims and lod imformation. + + Examples: + .. code-block:: python + + import paddle + tensor = paddle.ones([3,3]) + metainfo = tensor.value().get_tensor()._share_filename() + + )DOC") + .def("_new_shared_filename", + [](py::tuple t) { // __setstate__ + if (t.size() != 5) + throw std::runtime_error("Invalid Tensor meta info state!"); + + framework::Tensor tensor; + + // 2. Rebuild Allocation + const std::string &ipc_name = t[0].cast(); + size_t size = t[1].cast(); + int flags = memory::allocation::MAPPED_SHAREDMEM | + memory::allocation::MAPPED_NOCREATE; + + auto shared_holder = + memory::allocation::AllocateRefcountedMemoryMapAllocation( + ipc_name, flags, size); + + // 3. Rebuild Tensor + tensor.ResetHolderWithType( + shared_holder, + static_cast(t[2].cast())); + tensor.Resize(phi::make_ddim(t[3].cast>())); + tensor.set_lod(t[4].cast()); + + return tensor; + }, + R"DOC( + Deserialize CPU lod tensor from shared memory. + + Params: + tuple: contrains ipc file name, data size, data type, + tensor dims and lod information. + + Examples: + .. code-block:: python + + import paddle + tensor = paddle.ones([3,3]) + metainfo = tensor.value().get_tensor()._share_filename() + tensor_from_shared = paddle.to_tensor(paddle.fluid.core.LoDTensor._new_shared_filename(metainfo)) + + )DOC") + .def("_shared_incref", + [](framework::Tensor &self) { + auto *mmap_allocation = dynamic_cast< + memory::allocation::RefcountedMemoryMapAllocation *>( + self.Holder().get()); + if (mmap_allocation) { + mmap_allocation->incref(); + } + }, + R"DOC( + Increase reference count of share_filename tensor. + )DOC") + .def("_shared_decref", + [](framework::Tensor &self) { + auto *mmap_allocation = dynamic_cast< + memory::allocation::RefcountedMemoryMapAllocation *>( + self.Holder().get()); + if (mmap_allocation) { + mmap_allocation->decref(); + } + }, + R"DOC( + Decrease reference count of share_filename tensor. + )DOC") .def(py::pickle( [](const framework::Tensor &t) { // __getstate__ auto holder = t.Holder(); diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index e75b8d1f60b..b05f16a0606 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -557,6 +557,7 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dataset) + list(REMOVE_ITEM TEST_OPS test_paddle_multiprocessing) endif() if (NOT WITH_GLOO) @@ -1174,6 +1175,7 @@ if((WITH_ROCM OR WITH_GPU) AND NOT WIN32) test_collective_global_scatter PROPERTIES LABELS "RUN_TYPE=DIST") endif() + set_tests_properties(test_paddle_multiprocessing PROPERTIES TIMEOUT 120) set_tests_properties(test_reducescatter_api PROPERTIES TIMEOUT 120) set_tests_properties(test_broadcast PROPERTIES TIMEOUT 120) set_tests_properties(test_reducescatter PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py new file mode 100644 index 00000000000..1e31356a6bc --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py @@ -0,0 +1,199 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import gc +import sys +import unittest +import time +import paddle +import paddle.incubate.multiprocessing as mp + +REPEAT = 20 +HAS_SHM_FILES = os.path.isdir('/dev/shm') + + +def fill_tensor(queue, event): + data = queue.get() + with paddle.no_grad(): + data[0][:] = 5 + data[1][:] = 5 + + event.set() + + +def send_tensor(queue, event, device, dtype): + tensor = paddle.ones([5, 5], dtype=dtype) + queue.put(tensor) + queue.put(tensor) + event.wait() + + +def send_parambase(queue, event, device, dtype): + tensor = paddle.nn.Layer().create_parameter( + [5, 5], + dtype=dtype, + default_initializer=paddle.nn.initializer.Constant(value=1.0)) + queue.put(tensor) + queue.put(tensor) + event.wait() + + +class leak_checker(object): + def __init__(self, test_case): + self.checked_pids = [os.getpid()] + self.test_case = test_case + + def __enter__(self): + self.next_fds = self._get_next_fds(10) + return self + + def __exit__(self, *args): + if args[0] is None: + self.test_case.assertFalse(self.has_shm_files()) + return False + + def check_pid(self, pid): + self.checked_pids.append(pid) + + def _get_next_fds(self, n=1): + fds = [os.dup(0) for i in range(n)] + for fd in fds: + os.close(fd) + return fds + + def has_shm_files(self, wait=True): + if not HAS_SHM_FILES: + return False + result = self._has_shm_files() + if result and wait: + time.sleep(0.5) + return self._has_shm_files() + return result + + def _has_shm_files(self): + gc.collect() + names = ['paddle_' + str(pid) for pid in self.checked_pids] + for filename in os.listdir('/dev/shm'): + for name in names: + if filename.startswith(name): + print("have", filename) + return True + return False + + +class TestMultiprocessingBase(unittest.TestCase): + def get_tensor(self, device="cpu"): + self.device = device.lower() + place = None + tensor = paddle.zeros([5, 5], dtype="float32") + return tensor + + def get_parameter(self): + w = paddle.nn.Layer().create_parameter( + [10, 10], + default_initializer=paddle.nn.initializer.Constant(value=0.0)) + return w + + def _test_empty(self, dtype="float32"): + q = mp.Queue() + empty = paddle.to_tensor([], dtype=dtype) + q.put(empty) + out = q.get(timeout=1) + self.assertEqual(str(out), str(empty)) + + def _test_sharing(self, + ctx=mp, + device='cpu', + dtype="float32", + repeat=1, + param=False): + def test_fill(): + if param: + x = self.get_parameter() + y = (x[:, 1]).detach() + else: + x = self.get_tensor() + y = x[:, 1] + + data = [x, y] + + queue = ctx.Queue() + event = ctx.Event() + queue.put(data) + + process = ctx.Process(target=fill_tensor, args=(queue, event)) + process.daemon = True + lc.check_pid(process.pid) + process.start() + + event.wait(30) + + self.assertTrue(event.is_set()) + self.assertTrue(data[0].equal(5).all()) + self.assertTrue(data[1].equal(5).all()) + + process.join(1 if device != "gpu" else 10) + self.assertFalse(process.is_alive()) + + def test_receive(): + queue = ctx.Queue() + event = ctx.Event() + + process = ctx.Process( + target=send_parambase if param else send_tensor, + args=(queue, event, device, dtype)) + process.daemon = True + lc.check_pid(process.pid) + process.start() + + t1 = queue.get() + t2 = queue.get() + self.assertTrue(t1.equal(1).all()) + del t1, t2 + + event.set() + process.join(1 if device != "gpu" else 10) + self.assertFalse(process.is_alive()) + + with leak_checker(self) as lc: + for _ in range(repeat): + test_fill() + test_receive() + + +class TestMultiprocessingCpu(TestMultiprocessingBase): + def test_pass_tensor(self): + paddle.set_device("cpu") + self._test_sharing(repeat=REPEAT) + + def test_pass_parambase(self): + paddle.set_device("cpu") + self._test_sharing(repeat=1, param=True) + + def test_pass_empty(self): + paddle.set_device("cpu") + self._test_empty() + + +class TestMultiprocessingGpu(TestMultiprocessingBase): + @unittest.skipIf(not paddle.fluid.core.is_compiled_with_cuda(), + "core is not compiled with CUDA") + def test_pass_tensor(self): + paddle.set_device("gpu") + self._test_sharing(mp.get_context("spawn"), "gpu") + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/incubate/multiprocessing/__init__.py b/python/paddle/incubate/multiprocessing/__init__.py new file mode 100644 index 00000000000..27c23be3a89 --- /dev/null +++ b/python/paddle/incubate/multiprocessing/__init__.py @@ -0,0 +1,27 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .reductions import init_reductions +import multiprocessing + +__all__ = [] + +from multiprocessing import * # noqa: F403 + +__all__ += multiprocessing.__all__ # type: ignore[attr-defined] + +# Only support linux for now +# Only support file_system sharing strategy. + +init_reductions() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py new file mode 100644 index 00000000000..cfbc55afd3b --- /dev/null +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -0,0 +1,189 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle + +# TODO: check the hooks of tensor +# TODO: check serializing named tensor +# TODO: check influence on autograd +import os +import sys +import warnings +import math +import copy +import threading +import multiprocessing +from multiprocessing.util import register_after_fork +from multiprocessing.reduction import ForkingPickler + +from collections import OrderedDict + + +def _supported_check(): + if sys.platform != "linux": + # warnings.warn("`paddle.multiprocessing` only support linux for now, " + # " import this will not take any effect !") + + return False + + if not sys.version_info >= (3, 4): + warnings.warn("Use `paddle.multiprocessing` to share paddle tensor " + "requires python version greater than 3.4 ." + " `paddle.multiprocessing` will not take any effect !!!") + return False + + return True + + +class LRUSharedCache(OrderedDict): + def __init__(self): + self.limit = 128 + self._after_fork() + register_after_fork(self, LRUSharedCache._after_fork) + + def _after_fork(self): + self.lock = threading.Lock() + + def get(self, key): + with self.lock: + try: + value = super().pop(key) + super().__setitem__(key, value) + return value + except KeyError: + return None + + def __setitem__(self, key, value): + with self.lock: + try: + super().__delitem__(key) + except KeyError: + if len(self) >= self.limit: + super().popitem(last=False) + super().__setitem__(key, value) + + +shared_cache = LRUSharedCache() + + +def cuda_from_cache(key): + lodtensor = shared_cache.get(key) + if lodtensor is None: + return None + return lodtensor + + +def rebuild_tensor(cls, lodtensor, metadata): + if cls == paddle.fluid.framework.ParamBase: + tensor = paddle.fluid.framework.ParamBase(lodtensor.shape(), + lodtensor._dtype(), + **metadata) + tensor.value().get_tensor()._share_data_with(lodtensor) + else: + size, stop_gradient = metadata + tensor = paddle.fluid.core.VarBase() + if lodtensor._is_initialized(): + tensor.value().get_tensor()._share_data_with(lodtensor) + else: + tensor = paddle.to_tensor([], dtype=lodtensor._dtype()) + tensor.stop_gradient = stop_gradient + return tensor + + +def reduce_tensor(tensor): + lodtensor = tensor.value().get_tensor() + + if not tensor.stop_gradient and not tensor.is_leaf: + raise RuntimeError( + "Refusing to serialize non-leaf tensor which not stop_gradient, you can detach it!" + ) + # TODO: add serializing name and hooks check + if tensor.place.is_cpu_place() or tensor.place.is_gpu_place( + ) or tensor.place.is_cuda_pinned_place(): + if type(tensor) == paddle.fluid.framework.ParamBase: + metadata = copy.deepcopy(tensor.__dict__) + else: + metadata = (tensor.size, tensor.stop_gradient) + + return (rebuild_tensor, (type(tensor), lodtensor, metadata)) + else: + raise ValueError( + "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!" + % tensor.place) + + +def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): + lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod)) + lodtensor._shared_decref() + return lodtensor + + +def rebuild_cuda_tensor(cls, handle, offset_bytes, size, type_idx, dims, lod, + device_idx): + cache_tensor = cuda_from_cache((handle, offset_bytes)) + if cache_tensor is None: + lodtensor = cls._new_shared_cuda( + (handle, offset_bytes, size, type_idx, dims, lod, device_idx)) + # We only cache cuda shared tensor here. + # The opening cost of cudaIpcMemoryHandle is very high. + # Since we cache the recived tensor directly, + # The sender may reallocate the tensor space, + # you should manualy maintian the lifecycle of ipc tensor + shared_cache[(handle, offset_bytes)] = lodtensor + else: + lodtensor = paddle.fluid.core.LoDTensor() + lodtensor._share_buffer_with(cache_tensor, + (size, type_idx, dims, lod, device_idx)) + + return lodtensor + + +def rebuild_lodtensor_empty(cls): + #TODO: check if tensor initialized + #TODO: handle the dtype of empty tensor + return cls() + + +def reduce_lodtensor(lodtensor): + if lodtensor._place().is_cpu_place() or lodtensor._place( + ).is_cuda_pinned_place(): + for dim in lodtensor.shape(): + if dim == 0: + # Empty tensors have nothing be mmapped. + return (rebuild_lodtensor_empty, (type(lodtensor), )) + + # Default use share filename stratege + metadata = lodtensor._share_filename( + ) # ipc_name, size, type_idx, dims, lod + rebuild = rebuild_lodtensor_filename + lodtensor._shared_incref() + # TODO, maintain reference for lodtensor + # TODO: support file_discriptor stratege + elif lodtensor._place().is_gpu_place(): + metadata = lodtensor._share_cuda() + rebuild = rebuild_cuda_tensor + else: + raise RuntimeError("We only support pass cpu/gpu lodtensor for now!") + + return (rebuild, (type(lodtensor), ) + metadata) + + +def init_reductions(): + if not _supported_check(): + return + + ForkingPickler.register(paddle.Tensor, reduce_tensor) + ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor) + ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor) + ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor) -- GitLab