From e973bd732d80c4c9ffa0395c1db7d80b9dfabda5 Mon Sep 17 00:00:00 2001 From: Kaipeng Deng Date: Wed, 31 Mar 2021 15:16:31 +0800 Subject: [PATCH] Polish tensor pipeline (#31701) * polish tensor pipeline. test=develop --- paddle/fluid/pybind/imperative.cc | 62 +++++++++++++++++++ python/paddle/fluid/core.py | 2 + .../fluid/dataloader/dataloader_iter.py | 8 ++- python/paddle/fluid/dataloader/flat.py | 12 +--- python/paddle/fluid/dataloader/worker.py | 6 +- 5 files changed, 77 insertions(+), 13 deletions(-) diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index eed3b3b7691..40cf6cd84be 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -494,6 +494,39 @@ void BindImperative(py::module *m_ptr) { }, py::return_value_policy::take_ownership); + m.def("_array_to_share_memory_tensor", + [](py::object &obj) { + // 1. cast to python array + auto array = obj.cast(); + PADDLE_ENFORCE_NE( + string::Sprintf("%s", array.dtype()).compare("object"), 0, + platform::errors::InvalidArgument( + "Faild to convert input data to a regular ndarray.\n * " + "Usually this means the input data contains nested " + "lists with different lengths.\n * Check the reader " + "function passed to 'set_(sample/sample_list/batch)" + "_generator' to locate the data causes this issue.")); + // 2. construcct LoDTensor + framework::LoDTensor t; + SetTensorFromPyArray(&t, array, + platform::CPUPlace(), true); + // 3. allocate shared memory + void *data_ptr = t.data(); + size_t data_size = t.numel() * framework::SizeOfType(t.type()); + auto shared_writer_holder = + memory::allocation::AllocateMemoryMapWriterAllocation(data_size); + // 4. maintain mmap fd set & backup ipc_name + const std::string &ipc_name = shared_writer_holder->ipc_name(); + memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name); + // 5. copy data & reset holder + memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(), + platform::CPUPlace(), data_ptr, data_size); + t.ResetHolder(shared_writer_holder); + + return t; + }, + py::return_value_policy::take_ownership); + m.def("_remove_tensor_list_mmap_fds", [](py::list &tensor_list) { for (size_t i = 0; i < tensor_list.size(); ++i) { auto t = tensor_list[i].cast(); @@ -1111,6 +1144,35 @@ void BindImperative(py::module *m_ptr) { y = x.cuda(1) print(y.place) # CUDAPlace(1) )DOC") + .def("_share_memory", + [](const std::shared_ptr &self) { +#ifndef _WIN32 + PADDLE_ENFORCE_EQ( + platform::is_cpu_place(self->Place()), true, + platform::errors::InvalidArgument( + "Sharing memory only support CPU Tensor currently")); + // 1. get LoDTensor + auto *t = self->MutableVar()->GetMutable(); + // 2. allocate shared memory + void *data_ptr = t->data(); + size_t data_size = t->numel() * framework::SizeOfType(t->type()); + auto shared_writer_holder = + memory::allocation::AllocateMemoryMapWriterAllocation( + data_size); + // 3. maintain mmap fd set & backup ipc_name + const std::string &ipc_name = shared_writer_holder->ipc_name(); + memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name); + // 4. copy data & reset holder + memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(), + platform::CPUPlace(), data_ptr, data_size); + t->ResetHolder(shared_writer_holder); + return *t; +#else + PADDLE_THROW(platform::errors::PermissionDenied( + "Sharing memory in Windows OS is not supported currently")); +#endif + }, + py::return_value_policy::reference) .def("copy_", &imperative::VarBase::CopyFrom) .def("_copy_to", [](const std::shared_ptr &self, diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index 4c24eb3d7fc..d3dc26c946d 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -279,6 +279,7 @@ if avx_supported(): from .core_avx import _set_process_signal_handler from .core_avx import _throw_error_if_process_failed from .core_avx import _convert_to_tensor_list + from .core_avx import _array_to_share_memory_tensor from .core_avx import _cleanup_mmap_fds from .core_avx import _remove_tensor_list_mmap_fds except Exception as e: @@ -333,6 +334,7 @@ if load_noavx: from .core_noavx import _set_process_signal_handler from .core_noavx import _throw_error_if_process_failed from .core_noavx import _convert_to_tensor_list + from .core_noavx import _array_to_share_memory_tensor from .core_noavx import _cleanup_mmap_fds from .core_noavx import _remove_tensor_list_mmap_fds except Exception as e: diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 0cd12e874d9..167c7987c55 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -166,7 +166,9 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): # pack as LoDTensorArray array = core.LoDTensorArray() for slot in batch: - if not isinstance(slot, core.LoDTensor): + if isinstance(slot, paddle.Tensor): + slot = slot.value().get_tensor() + elif not isinstance(slot, core.LoDTensor): tmp = core.LoDTensor() tmp.set(slot, core.CPUPlace()) slot = tmp @@ -388,7 +390,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): # LoDTensor not in shared memory is not # serializable, cannot be create in workers for slot in batch: - if not isinstance(slot, core.LoDTensor): + if isinstance(slot, paddle.Tensor): + slot = slot.value().get_tensor() + elif not isinstance(slot, core.LoDTensor): tmp = core.LoDTensor() tmp.set(slot, core.CPUPlace()) slot = tmp diff --git a/python/paddle/fluid/dataloader/flat.py b/python/paddle/fluid/dataloader/flat.py index 6cccbc7ee4e..db3a725ece0 100644 --- a/python/paddle/fluid/dataloader/flat.py +++ b/python/paddle/fluid/dataloader/flat.py @@ -36,14 +36,10 @@ def _flatten_batch(batch): def _flatten(batch, flat_batch, structure, field_idx): if isinstance(batch, Sequence): for field in batch: - if isinstance(field, np.ndarray): + if isinstance(field, (np.ndarray, paddle.Tensor)): structure.append('{}{}'.format(FIELD_PREFIX, field_idx)) flat_batch.append(field) field_idx += 1 - elif isinstance(field, paddle.Tensor): - structure.append('{}{}'.format(FIELD_PREFIX, field_idx)) - flat_batch.append(field.numpy()) - field_idx += 1 elif isinstance(field, (str, bytes, numbers.Number)): structure.append(field) elif isinstance(field, Sequence): @@ -58,14 +54,10 @@ def _flatten_batch(batch): structure.append(field) elif isinstance(batch, Mapping): for k, field in batch.items(): - if isinstance(field, np.ndarray): + if isinstance(field, (np.ndarray, paddle.Tensor)): structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx) flat_batch.append(field) field_idx += 1 - elif isinstance(field, paddle.Tensor): - structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx) - flat_batch.append(field.numpy()) - field_idx += 1 elif isinstance(field, (str, bytes, numbers.Number)): structure[k] = field elif isinstance(field, Sequence): diff --git a/python/paddle/fluid/dataloader/worker.py b/python/paddle/fluid/dataloader/worker.py index 2d1b554e53d..26bd1f06e12 100644 --- a/python/paddle/fluid/dataloader/worker.py +++ b/python/paddle/fluid/dataloader/worker.py @@ -238,7 +238,11 @@ def _worker_loop(dataset, dataset_kind, indices_queue, out_queue, done_event, out_queue.put((idx, batch, None)) batch, structure = _flatten_batch(batch) if use_shared_memory: - tensor_list = core._convert_to_tensor_list(batch) + tensor_list = [ + core._array_to_share_memory_tensor(b) + if isinstance(b, np.ndarray) else b._share_memory() + for b in batch + ] out_queue.put((idx, tensor_list, structure)) core._remove_tensor_list_mmap_fds(tensor_list) else: -- GitLab