未验证 提交 e553f758 编写于 作者: Z Zhong Hui 提交者: GitHub

[multiprocessing] Add paddle.incubate.multiprocessing for sharing tensors ...

[multiprocessing] Add paddle.incubate.multiprocessing for sharing tensors  between python processes. (#37302)

* Add support for paddle.multiprocessing
* move multiprocessing to incubate.
上级 95a526b2
......@@ -131,4 +131,7 @@ cc_library(virtual_memory_auto_growth_best_fit_allocator SRCS virtual_memory_aut
if(NOT WIN32)
cc_library(mmap_allocator SRCS mmap_allocator.cc DEPS allocator)
cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator)
if (WITH_GPU)
cc_library(cuda_ipc_allocator SRCS cuda_ipc_allocator.cc DEPS allocator)
endif()
endif(NOT WIN32)
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _WIN32
#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include <fcntl.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <random>
#include <string>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace memory {
namespace allocation {
namespace {
std::mutex ipc_mutex_;
std::unordered_map<std::string, std::weak_ptr<void>> ipc_handle_to_baseptr_;
} // namespace
std::shared_ptr<void> GetIpcBasePtr(std::string handle) {
std::lock_guard<std::mutex> lock(ipc_mutex_);
auto iter = ipc_handle_to_baseptr_.find(handle);
if (iter != ipc_handle_to_baseptr_.end()) {
auto baseptr = iter->second.lock();
if (baseptr) return baseptr;
}
// The IpcMemHandle can only open once for the same handle,
// so here we cache it here.
void *baseptr = nullptr;
auto ipc_handle =
reinterpret_cast<const cudaIpcMemHandle_t *>(handle.c_str());
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcOpenMemHandle(
&baseptr, *ipc_handle, cudaIpcMemLazyEnablePeerAccess));
// Close ipc handle on the same device.
int device_id = platform::GetCurrentDeviceId();
// Add deleter to close ipc handle.
auto sp = std::shared_ptr<void>(baseptr, [handle, device_id](void *ptr) {
platform::CUDADeviceGuard guard(device_id);
std::lock_guard<std::mutex> lock(ipc_mutex_);
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcCloseMemHandle(ptr));
ipc_handle_to_baseptr_.erase(handle);
VLOG(6) << "cudaIpcCloseMemHandle for ptr:"
<< "\t" << ptr;
});
std::weak_ptr<void> wp = sp;
ipc_handle_to_baseptr_.insert(iter, {handle, wp});
return sp;
}
CudaIpcAllocation::~CudaIpcAllocation() {
shared_ptr_.reset();
VLOG(6) << "tensor deleted cudaIpcCloseMemHandle for ptr:"
<< "\t" << this->ptr();
}
} // namespace allocation
} // namespace memory
} // namespace paddle
#endif
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _WIN32
#pragma once
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_set>
#include <utility>
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace memory {
namespace allocation {
std::shared_ptr<void> GetIpcBasePtr(std::string handle);
class CudaIpcAllocation : public Allocation {
public:
explicit CudaIpcAllocation(void *ptr, size_t size, int device_id,
std::shared_ptr<void> shared_ptr)
: Allocation(ptr, size, platform::CUDAPlace(device_id)),
device_id_(std::move(device_id)),
shared_ptr_(std::move(shared_ptr)) {}
inline const int &device_id() const { return device_id_; }
~CudaIpcAllocation() override;
private:
int device_id_;
std::shared_ptr<void> shared_ptr_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
#endif
......@@ -29,6 +29,155 @@ namespace paddle {
namespace memory {
namespace allocation {
std::string GetIPCName() {
static std::random_device rd;
std::string handle = "/paddle_";
#ifdef _WIN32
handle += std::to_string(GetCurrentProcessId());
#else
handle += std::to_string(getpid());
#endif
handle += "_";
handle += std::to_string(rd());
return handle;
}
struct CountInfo {
std::atomic<int> refcount;
};
void AllocateMemoryMap(std::string filename, int flags, size_t size,
void **map_ptr_, int *fd_) {
// TODO(@ZHUI): support win32
int file_flags = 0;
int fd = -1;
if (flags & MAPPED_SHAREDMEM) {
file_flags = O_RDWR | O_CREAT;
} else {
file_flags = O_RDONLY;
}
if (flags & MAPPED_EXCLUSIVE) {
file_flags |= O_EXCL;
}
if (flags & MAPPED_NOCREATE) {
file_flags &= ~O_CREAT;
}
if (!(flags & MAPPED_FROMFD)) {
if (flags & MAPPED_SHAREDMEM) {
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
PADDLE_ENFORCE_NE(
fd, -1,
platform::errors::Unavailable(
"File descriptor %s open failed, unable in read-write mode",
filename.c_str()));
VLOG(6) << "shm_open: " << filename;
}
} else {
fd = -1;
}
PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0,
platform::errors::Unavailable(
"Fruncate a file to a specified length failed!"));
if (flags & MAPPED_SHAREDMEM) {
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
} else {
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
}
PADDLE_ENFORCE_NE(*map_ptr_, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when create shared memory."));
if (flags & MAPPED_KEEPFD) {
*fd_ = fd;
} else {
PADDLE_ENFORCE_NE(::close(fd), -1,
platform::errors::Unavailable(
"Error closing memory maped file <", filename, ">"));
*fd_ = -1;
}
}
std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
size_t size) {
int fd = -1;
void *base_ptr = nullptr;
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
void *aliged_base_ptr =
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
return std::make_shared<RefcountedMemoryMapAllocation>(aliged_base_ptr, size,
filename, flags, fd);
}
RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
// must reset base ptr first.
resetBaseptr();
initializeRefercount();
}
void MemoryMapAllocation::close() {
if (closed_) {
return;
}
closed_ = true;
}
MemoryMapAllocation::~MemoryMapAllocation() { close(); }
void RefcountedMemoryMapAllocation::incref() {
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
++info->refcount;
}
int RefcountedMemoryMapAllocation::decref() {
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
return --info->refcount == 0;
}
void RefcountedMemoryMapAllocation::resetBaseptr() {
map_ptr_ =
static_cast<void *>(static_cast<char *>(map_ptr_) - mmap_alignment);
map_size_ = map_size_ + mmap_alignment;
}
void RefcountedMemoryMapAllocation::initializeRefercount() {
CountInfo *info = reinterpret_cast<CountInfo *>(map_ptr_);
if (flags_ & MAPPED_EXCLUSIVE) {
new (&info->refcount) std::atomic<int>(1);
} else {
info->refcount++;
}
}
void RefcountedMemoryMapAllocation::close() {
if (closed_) {
return;
}
closed_ = true;
void *data = map_ptr_;
CountInfo *info = reinterpret_cast<CountInfo *>(data);
if (--info->refcount == 0) {
PADDLE_ENFORCE_NE(
shm_unlink(ipc_name_.c_str()), -1,
platform::errors::Unavailable(
"could not unlink the shared memory file ", ipc_name_));
VLOG(6) << "shm_unlink file: " << ipc_name_;
}
PADDLE_ENFORCE_NE(
munmap(map_ptr_, map_size_), -1,
platform::errors::Unavailable("could not unmap the shared memory file: ",
strerror(errno), " (", errno, ")"));
}
MemoryMapWriterAllocation::~MemoryMapWriterAllocation() {
PADDLE_ENFORCE_NE(
munmap(this->ptr(), this->size()), -1,
......@@ -44,30 +193,30 @@ MemoryMapReaderAllocation::~MemoryMapReaderAllocation() {
/* Here we do not pay attention to the result of shm_unlink,
because the memory mapped file may have been cleared due to the
MemoryMapFdSet::Clear() */
// Code of DataLoader subprocess:
//
// core._array_to_share_memory_tensor(b)
// out_queue.put((idx, tensor_list, structure))
// core._remove_tensor_list_mmap_fds(tensor_list)
/* If the tensor in already in the send queue, the tensor will be
* deconstructed by the function. If the tensor not send yet, it
* will be cleared by MemoryMapFdSet::Clear().
* If the `_remove_tensor_list_mmap_fds` have be interrupted, the
* tensor will be cleared by both methods.
* */
shm_unlink(this->ipc_name().c_str());
MemoryMapFdSet::Instance().Remove(this->ipc_name());
VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name();
}
std::string GetIPCName() {
static std::random_device rd;
std::string handle = "/paddle_";
#ifdef _WIN32
handle += std::to_string(GetCurrentProcessId());
#else
handle += std::to_string(getpid());
#endif
handle += "_";
handle += std::to_string(rd());
return handle;
}
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
size_t size) {
const std::string &ipc_name = GetIPCName();
int flags = O_RDWR | O_CREAT;
int fd = shm_open(ipc_name.c_str(), flags, 0644);
int fd = shm_open(ipc_name.c_str(), flags, 0600);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));
......@@ -86,12 +235,14 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
const std::string &ipc_name, size_t size) {
int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644);
int flags = O_RDWR | O_CREAT;
flags &= ~O_CREAT;
int fd = shm_open(ipc_name.c_str(), flags, 0600);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));
void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
void *ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
PADDLE_ENFORCE_NE(ptr, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when rebuild shared memory."));
......
......@@ -16,8 +16,9 @@
#ifndef _WIN32
#include <atomic>
#include <memory>
#include <mutex> // NOLINT
#include <mutex>
#include <string>
#include <unordered_set>
#include <utility>
......@@ -28,6 +29,72 @@ namespace paddle {
namespace memory {
namespace allocation {
std::string GetIPCName();
static constexpr int64_t mmap_alignment = 64;
enum MappedModes {
MAPPED_SHAREDMEM = 1,
MAPPED_EXCLUSIVE = 2,
MAPPED_NOCREATE = 4,
MAPPED_KEEPFD = 8,
MAPPED_FROMFD = 16,
MAPPED_UNLINK = 32
};
class MemoryMapAllocation : public Allocation {
public:
explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)),
map_ptr_(ptr),
map_size_(size) {}
explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name,
int flags, int fd)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)),
fd_(fd),
flags_(flags),
map_ptr_(ptr),
map_size_(size) {}
inline const std::string &ipc_name() const { return ipc_name_; }
virtual void close();
~MemoryMapAllocation() override;
protected:
std::string ipc_name_;
int fd_ = -1;
int flags_ = 0;
void *map_ptr_ = nullptr;
size_t map_size_ = 0;
bool closed_ = false;
};
class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
public:
RefcountedMemoryMapAllocation(void *ptr, size_t size, std::string ipc_name,
int flags, int fd);
void incref();
int decref();
void close() override;
virtual ~RefcountedMemoryMapAllocation() { close(); }
protected:
void initializeRefercount();
void resetBaseptr();
};
void AllocateMemoryMap(std::string filename, int flags, size_t size,
void **base_ptr_, int *fd_);
std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
size_t size);
class MemoryMapWriterAllocation : public Allocation {
public:
explicit MemoryMapWriterAllocation(void *ptr, size_t size,
......
......@@ -44,6 +44,9 @@ endif()
if(NOT WIN32)
set(PYBIND_DEPS ${PYBIND_DEPS} data_loader)
set(PYBIND_DEPS ${PYBIND_DEPS} mmap_allocator)
if (WITH_GPU)
set(PYBIND_DEPS ${PYBIND_DEPS} cuda_ipc_allocator)
endif()
if (WITH_NCCL OR WITH_RCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context)
set(PYBIND_DEPS ${PYBIND_DEPS} heter_ccl_context)
......
......@@ -64,6 +64,9 @@ limitations under the License. */
#include "paddle/fluid/imperative/amp_auto_cast.h"
#include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/memory/allocation/allocator_strategy.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h"
#endif
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
#include "paddle/fluid/operators/activation_op.h"
#include "paddle/fluid/operators/common_infer_shape_functions.h"
......@@ -1187,6 +1190,287 @@ PYBIND11_MODULE(core_noavx, m) {
});
#else
})
#ifdef PADDLE_WITH_CUDA
.def("_share_buffer_with",
[](framework::Tensor &self, const framework::Tensor src,
py::tuple t) {
auto *cuda_ipc_allocation =
dynamic_cast<memory::allocation::CudaIpcAllocation *>(
src.Holder().get());
PADDLE_ENFORCE_NOT_NULL(
cuda_ipc_allocation,
platform::errors::PreconditionNotMet(
"Tensor is not Cuda IPC shared tensor. "
"Now only Tensor shared by cuda ipc could use this "
"api."));
size_t size = t[0].cast<size_t>();
auto dtype =
static_cast<paddle::experimental::DataType>(t[1].cast<int>());
auto dims = phi::make_ddim(t[2].cast<std::vector<int>>());
auto lod_info = t[3].cast<framework::LoD>();
auto device_id = t[4].cast<int>();
auto shared_reader_holder =
std::make_shared<memory::allocation::Allocation>(
cuda_ipc_allocation->ptr(),
cuda_ipc_allocation->base_ptr(), size,
platform::CUDAPlace(device_id));
self.ResetHolderWithType(shared_reader_holder, dtype);
self.Resize(dims);
self.set_lod(lod_info);
VLOG(6) << "Reconstructed tensor with buffer shared!";
},
R"DOC(
Deserialize GPU Tensor for existed shared Cuda IPC tensor.
Params:
tensor: Shared Cuda IPC tensor.
tuple: contrains data size, data type,
tensor dims, lod information, device index.
)DOC")
.def("_share_cuda",
[](framework::Tensor self) {
if (!self.IsInitialized() || self.numel() == 0)
throw std::runtime_error(
"Tensor not initialized or numel is 0. could not pass "
"to shared memory. ");
auto *holder = dynamic_cast<memory::allocation::Allocation *>(
self.Holder().get());
PADDLE_ENFORCE_EQ(
platform::is_gpu_place(holder->place()), true,
platform::errors::InvalidArgument(
"Tensor is not on GPU. share_cuda only support GPU "
"Tensor, share_filename is for CPU tensor."));
void *base_ptr = holder->base_ptr();
ptrdiff_t offset_bytes = reinterpret_cast<char *>(holder->ptr()) -
reinterpret_cast<char *>(base_ptr);
cudaIpcMemHandle_t handle;
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcGetMemHandle(&handle, base_ptr));
auto _handle = py::bytes(reinterpret_cast<char *>(&handle),
(py::ssize_t)CUDA_IPC_HANDLE_SIZE);
// TODO(ZHUI): use cuda event, to avoid sync.
const auto &device_id = paddle::platform::GetCurrentDeviceId();
auto stream =
paddle::platform::stream::get_current_stream(device_id);
stream->Synchronize();
int type_idx = static_cast<int>(self.type());
size_t data_size =
self.numel() *
framework::SizeOfType(
framework::TransToProtoVarType(self.type()));
return py::make_tuple(_handle, (py::size_t)offset_bytes, data_size,
type_idx, vectorize(self.dims()), self.lod(),
device_id);
},
R"DOC(
Serialize GPU Tensor by cudaIpcMemHandle.
Returns:
tuple: contrains handle, data size, data type,
tensor dims, lod information, device index.
Examples:
.. code-block:: python
import paddle
tensor = paddle.ones([3,3])
metainfo = tensor.value().get_tensor()._share_cuda()
)DOC")
.def("_new_shared_cuda",
[](py::tuple t) {
if (t.size() != 7)
throw std::runtime_error(
"Invalid Tensor meta info for shared cuda tensor!");
// 1. Create a new C++ instance
framework::Tensor tensor;
// 2. Rebuild Allocation from handle
const std::string &handle = t[0].cast<std::string>();
ptrdiff_t offset_bytes = (ptrdiff_t)t[1].cast<int64_t>();
auto device_id = t[6].cast<int>();
auto base_ptr = memory::allocation::GetIpcBasePtr(handle);
size_t size = t[2].cast<size_t>();
void *dev = base_ptr.get();
dev = reinterpret_cast<char *>(dev) + offset_bytes;
auto shared_reader_holder =
std::make_shared<memory::allocation::CudaIpcAllocation>(
dev, size, device_id, std::move(base_ptr));
// 3. Rebuild Tensor
tensor.ResetHolderWithType(
shared_reader_holder,
static_cast<paddle::experimental::DataType>(t[3].cast<int>()));
tensor.Resize(phi::make_ddim(t[4].cast<std::vector<int>>()));
tensor.set_lod(t[5].cast<framework::LoD>());
return tensor;
},
R"DOC(
Deserialize GPU lod tensor from cudaIpcMemHandle.
Params:
tuple: contrains handle, data size, data type,
tensor dims, lod information, device index.
Examples:
.. code-block:: python
import paddle
tensor = paddle.ones([3,3])
metainfo = tensor.value().get_tensor()._share_cuda()
tensor_from_shared = paddle.to_tensor(paddle.fluid.core.LoDTensor._new_shared_cuda(metainfo))
)DOC")
#endif
.def("_share_filename",
[](framework::Tensor &self) {
if (!self.IsInitialized() || self.numel() == 0)
throw std::runtime_error(
"Tensor not initialized or numel is 0. could not pass to "
"shared memory. ");
auto holder = self.Holder();
PADDLE_ENFORCE_EQ(
platform::is_cpu_place(holder->place()) ||
platform::is_cuda_pinned_place(holder->place()),
true, platform::errors::InvalidArgument(
"Tensor is not on CPU. share_filename only "
"support CPU Tensor."));
auto *mmap_allocation = dynamic_cast<
memory::allocation::RefcountedMemoryMapAllocation *>(
holder.get());
// If the tensor is not shared, allocate memory map allocation.
if (mmap_allocation == nullptr) {
void *data_ptr = self.data();
size_t data_size =
self.numel() *
framework::SizeOfType(
framework::TransToProtoVarType(self.type()));
int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_EXCLUSIVE;
std::string handle = memory::allocation::GetIPCName();
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
handle, flags, data_size);
// copy data & reset holder
if (platform::is_cuda_pinned_place(holder->place())) {
#ifdef PADDLE_WITH_CUDA
memory::Copy(platform::CPUPlace(), shared_holder->ptr(),
platform::CUDAPinnedPlace(), data_ptr, data_size);
#endif
} else {
memory::Copy(platform::CPUPlace(), shared_holder->ptr(),
platform::CPUPlace(), data_ptr, data_size);
}
self.ResetHolder(shared_holder);
mmap_allocation = shared_holder.get();
}
int type_idx = static_cast<int>(self.type());
return py::make_tuple(mmap_allocation->ipc_name(),
mmap_allocation->size(), type_idx,
vectorize(self.dims()), self.lod());
},
R"DOC(
Serialize CPU lod tensor in shared memory to tuple.
If the tensor is not in shared memory, we will copy it first.
Returns:
tuple: contrains ipc name, data size, data type,
tensor dims and lod imformation.
Examples:
.. code-block:: python
import paddle
tensor = paddle.ones([3,3])
metainfo = tensor.value().get_tensor()._share_filename()
)DOC")
.def("_new_shared_filename",
[](py::tuple t) { // __setstate__
if (t.size() != 5)
throw std::runtime_error("Invalid Tensor meta info state!");
framework::Tensor tensor;
// 2. Rebuild Allocation
const std::string &ipc_name = t[0].cast<std::string>();
size_t size = t[1].cast<size_t>();
int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_NOCREATE;
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
ipc_name, flags, size);
// 3. Rebuild Tensor
tensor.ResetHolderWithType(
shared_holder,
static_cast<paddle::experimental::DataType>(t[2].cast<int>()));
tensor.Resize(phi::make_ddim(t[3].cast<std::vector<int>>()));
tensor.set_lod(t[4].cast<framework::LoD>());
return tensor;
},
R"DOC(
Deserialize CPU lod tensor from shared memory.
Params:
tuple: contrains ipc file name, data size, data type,
tensor dims and lod information.
Examples:
.. code-block:: python
import paddle
tensor = paddle.ones([3,3])
metainfo = tensor.value().get_tensor()._share_filename()
tensor_from_shared = paddle.to_tensor(paddle.fluid.core.LoDTensor._new_shared_filename(metainfo))
)DOC")
.def("_shared_incref",
[](framework::Tensor &self) {
auto *mmap_allocation = dynamic_cast<
memory::allocation::RefcountedMemoryMapAllocation *>(
self.Holder().get());
if (mmap_allocation) {
mmap_allocation->incref();
}
},
R"DOC(
Increase reference count of share_filename tensor.
)DOC")
.def("_shared_decref",
[](framework::Tensor &self) {
auto *mmap_allocation = dynamic_cast<
memory::allocation::RefcountedMemoryMapAllocation *>(
self.Holder().get());
if (mmap_allocation) {
mmap_allocation->decref();
}
},
R"DOC(
Decrease reference count of share_filename tensor.
)DOC")
.def(py::pickle(
[](const framework::Tensor &t) { // __getstate__
auto holder = t.Holder();
......
......@@ -557,6 +557,7 @@ if (APPLE OR WIN32)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dataset)
list(REMOVE_ITEM TEST_OPS test_paddle_multiprocessing)
endif()
if (NOT WITH_GLOO)
......@@ -1174,6 +1175,7 @@ if((WITH_ROCM OR WITH_GPU) AND NOT WIN32)
test_collective_global_scatter
PROPERTIES LABELS "RUN_TYPE=DIST")
endif()
set_tests_properties(test_paddle_multiprocessing PROPERTIES TIMEOUT 120)
set_tests_properties(test_reducescatter_api PROPERTIES TIMEOUT 120)
set_tests_properties(test_broadcast PROPERTIES TIMEOUT 120)
set_tests_properties(test_reducescatter PROPERTIES TIMEOUT 120)
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import gc
import sys
import unittest
import time
import paddle
import paddle.incubate.multiprocessing as mp
REPEAT = 20
HAS_SHM_FILES = os.path.isdir('/dev/shm')
def fill_tensor(queue, event):
data = queue.get()
with paddle.no_grad():
data[0][:] = 5
data[1][:] = 5
event.set()
def send_tensor(queue, event, device, dtype):
tensor = paddle.ones([5, 5], dtype=dtype)
queue.put(tensor)
queue.put(tensor)
event.wait()
def send_parambase(queue, event, device, dtype):
tensor = paddle.nn.Layer().create_parameter(
[5, 5],
dtype=dtype,
default_initializer=paddle.nn.initializer.Constant(value=1.0))
queue.put(tensor)
queue.put(tensor)
event.wait()
class leak_checker(object):
def __init__(self, test_case):
self.checked_pids = [os.getpid()]
self.test_case = test_case
def __enter__(self):
self.next_fds = self._get_next_fds(10)
return self
def __exit__(self, *args):
if args[0] is None:
self.test_case.assertFalse(self.has_shm_files())
return False
def check_pid(self, pid):
self.checked_pids.append(pid)
def _get_next_fds(self, n=1):
fds = [os.dup(0) for i in range(n)]
for fd in fds:
os.close(fd)
return fds
def has_shm_files(self, wait=True):
if not HAS_SHM_FILES:
return False
result = self._has_shm_files()
if result and wait:
time.sleep(0.5)
return self._has_shm_files()
return result
def _has_shm_files(self):
gc.collect()
names = ['paddle_' + str(pid) for pid in self.checked_pids]
for filename in os.listdir('/dev/shm'):
for name in names:
if filename.startswith(name):
print("have", filename)
return True
return False
class TestMultiprocessingBase(unittest.TestCase):
def get_tensor(self, device="cpu"):
self.device = device.lower()
place = None
tensor = paddle.zeros([5, 5], dtype="float32")
return tensor
def get_parameter(self):
w = paddle.nn.Layer().create_parameter(
[10, 10],
default_initializer=paddle.nn.initializer.Constant(value=0.0))
return w
def _test_empty(self, dtype="float32"):
q = mp.Queue()
empty = paddle.to_tensor([], dtype=dtype)
q.put(empty)
out = q.get(timeout=1)
self.assertEqual(str(out), str(empty))
def _test_sharing(self,
ctx=mp,
device='cpu',
dtype="float32",
repeat=1,
param=False):
def test_fill():
if param:
x = self.get_parameter()
y = (x[:, 1]).detach()
else:
x = self.get_tensor()
y = x[:, 1]
data = [x, y]
queue = ctx.Queue()
event = ctx.Event()
queue.put(data)
process = ctx.Process(target=fill_tensor, args=(queue, event))
process.daemon = True
lc.check_pid(process.pid)
process.start()
event.wait(30)
self.assertTrue(event.is_set())
self.assertTrue(data[0].equal(5).all())
self.assertTrue(data[1].equal(5).all())
process.join(1 if device != "gpu" else 10)
self.assertFalse(process.is_alive())
def test_receive():
queue = ctx.Queue()
event = ctx.Event()
process = ctx.Process(
target=send_parambase if param else send_tensor,
args=(queue, event, device, dtype))
process.daemon = True
lc.check_pid(process.pid)
process.start()
t1 = queue.get()
t2 = queue.get()
self.assertTrue(t1.equal(1).all())
del t1, t2
event.set()
process.join(1 if device != "gpu" else 10)
self.assertFalse(process.is_alive())
with leak_checker(self) as lc:
for _ in range(repeat):
test_fill()
test_receive()
class TestMultiprocessingCpu(TestMultiprocessingBase):
def test_pass_tensor(self):
paddle.set_device("cpu")
self._test_sharing(repeat=REPEAT)
def test_pass_parambase(self):
paddle.set_device("cpu")
self._test_sharing(repeat=1, param=True)
def test_pass_empty(self):
paddle.set_device("cpu")
self._test_empty()
class TestMultiprocessingGpu(TestMultiprocessingBase):
@unittest.skipIf(not paddle.fluid.core.is_compiled_with_cuda(),
"core is not compiled with CUDA")
def test_pass_tensor(self):
paddle.set_device("gpu")
self._test_sharing(mp.get_context("spawn"), "gpu")
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .reductions import init_reductions
import multiprocessing
__all__ = []
from multiprocessing import * # noqa: F403
__all__ += multiprocessing.__all__ # type: ignore[attr-defined]
# Only support linux for now
# Only support file_system sharing strategy.
init_reductions()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
# TODO: check the hooks of tensor
# TODO: check serializing named tensor
# TODO: check influence on autograd
import os
import sys
import warnings
import math
import copy
import threading
import multiprocessing
from multiprocessing.util import register_after_fork
from multiprocessing.reduction import ForkingPickler
from collections import OrderedDict
def _supported_check():
if sys.platform != "linux":
# warnings.warn("`paddle.multiprocessing` only support linux for now, "
# " import this will not take any effect !")
return False
if not sys.version_info >= (3, 4):
warnings.warn("Use `paddle.multiprocessing` to share paddle tensor "
"requires python version greater than 3.4 ."
" `paddle.multiprocessing` will not take any effect !!!")
return False
return True
class LRUSharedCache(OrderedDict):
def __init__(self):
self.limit = 128
self._after_fork()
register_after_fork(self, LRUSharedCache._after_fork)
def _after_fork(self):
self.lock = threading.Lock()
def get(self, key):
with self.lock:
try:
value = super().pop(key)
super().__setitem__(key, value)
return value
except KeyError:
return None
def __setitem__(self, key, value):
with self.lock:
try:
super().__delitem__(key)
except KeyError:
if len(self) >= self.limit:
super().popitem(last=False)
super().__setitem__(key, value)
shared_cache = LRUSharedCache()
def cuda_from_cache(key):
lodtensor = shared_cache.get(key)
if lodtensor is None:
return None
return lodtensor
def rebuild_tensor(cls, lodtensor, metadata):
if cls == paddle.fluid.framework.ParamBase:
tensor = paddle.fluid.framework.ParamBase(lodtensor.shape(),
lodtensor._dtype(),
**metadata)
tensor.value().get_tensor()._share_data_with(lodtensor)
else:
size, stop_gradient = metadata
tensor = paddle.fluid.core.VarBase()
if lodtensor._is_initialized():
tensor.value().get_tensor()._share_data_with(lodtensor)
else:
tensor = paddle.to_tensor([], dtype=lodtensor._dtype())
tensor.stop_gradient = stop_gradient
return tensor
def reduce_tensor(tensor):
lodtensor = tensor.value().get_tensor()
if not tensor.stop_gradient and not tensor.is_leaf:
raise RuntimeError(
"Refusing to serialize non-leaf tensor which not stop_gradient, you can detach it!"
)
# TODO: add serializing name and hooks check
if tensor.place.is_cpu_place() or tensor.place.is_gpu_place(
) or tensor.place.is_cuda_pinned_place():
if type(tensor) == paddle.fluid.framework.ParamBase:
metadata = copy.deepcopy(tensor.__dict__)
else:
metadata = (tensor.size, tensor.stop_gradient)
return (rebuild_tensor, (type(tensor), lodtensor, metadata))
else:
raise ValueError(
"Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!"
% tensor.place)
def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
lodtensor._shared_decref()
return lodtensor
def rebuild_cuda_tensor(cls, handle, offset_bytes, size, type_idx, dims, lod,
device_idx):
cache_tensor = cuda_from_cache((handle, offset_bytes))
if cache_tensor is None:
lodtensor = cls._new_shared_cuda(
(handle, offset_bytes, size, type_idx, dims, lod, device_idx))
# We only cache cuda shared tensor here.
# The opening cost of cudaIpcMemoryHandle is very high.
# Since we cache the recived tensor directly,
# The sender may reallocate the tensor space,
# you should manualy maintian the lifecycle of ipc tensor
shared_cache[(handle, offset_bytes)] = lodtensor
else:
lodtensor = paddle.fluid.core.LoDTensor()
lodtensor._share_buffer_with(cache_tensor,
(size, type_idx, dims, lod, device_idx))
return lodtensor
def rebuild_lodtensor_empty(cls):
#TODO: check if tensor initialized
#TODO: handle the dtype of empty tensor
return cls()
def reduce_lodtensor(lodtensor):
if lodtensor._place().is_cpu_place() or lodtensor._place(
).is_cuda_pinned_place():
for dim in lodtensor.shape():
if dim == 0:
# Empty tensors have nothing be mmapped.
return (rebuild_lodtensor_empty, (type(lodtensor), ))
# Default use share filename stratege
metadata = lodtensor._share_filename(
) # ipc_name, size, type_idx, dims, lod
rebuild = rebuild_lodtensor_filename
lodtensor._shared_incref()
# TODO, maintain reference for lodtensor
# TODO: support file_discriptor stratege
elif lodtensor._place().is_gpu_place():
metadata = lodtensor._share_cuda()
rebuild = rebuild_cuda_tensor
else:
raise RuntimeError("We only support pass cpu/gpu lodtensor for now!")
return (rebuild, (type(lodtensor), ) + metadata)
def init_reductions():
if not _supported_check():
return
ForkingPickler.register(paddle.Tensor, reduce_tensor)
ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor)
ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor)
ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册