未验证 提交 e973bd73 编写于 作者: K Kaipeng Deng 提交者: GitHub

Polish tensor pipeline (#31701)

* polish tensor pipeline. test=develop
上级 495e7f9c
......@@ -494,6 +494,39 @@ void BindImperative(py::module *m_ptr) {
},
py::return_value_policy::take_ownership);
m.def("_array_to_share_memory_tensor",
[](py::object &obj) {
// 1. cast to python array
auto array = obj.cast<py::array>();
PADDLE_ENFORCE_NE(
string::Sprintf("%s", array.dtype()).compare("object"), 0,
platform::errors::InvalidArgument(
"Faild to convert input data to a regular ndarray.\n * "
"Usually this means the input data contains nested "
"lists with different lengths.\n * Check the reader "
"function passed to 'set_(sample/sample_list/batch)"
"_generator' to locate the data causes this issue."));
// 2. construcct LoDTensor
framework::LoDTensor t;
SetTensorFromPyArray<platform::CPUPlace>(&t, array,
platform::CPUPlace(), true);
// 3. allocate shared memory
void *data_ptr = t.data<void>();
size_t data_size = t.numel() * framework::SizeOfType(t.type());
auto shared_writer_holder =
memory::allocation::AllocateMemoryMapWriterAllocation(data_size);
// 4. maintain mmap fd set & backup ipc_name
const std::string &ipc_name = shared_writer_holder->ipc_name();
memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
// 5. copy data & reset holder
memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(),
platform::CPUPlace(), data_ptr, data_size);
t.ResetHolder(shared_writer_holder);
return t;
},
py::return_value_policy::take_ownership);
m.def("_remove_tensor_list_mmap_fds", [](py::list &tensor_list) {
for (size_t i = 0; i < tensor_list.size(); ++i) {
auto t = tensor_list[i].cast<framework::LoDTensor>();
......@@ -1111,6 +1144,35 @@ void BindImperative(py::module *m_ptr) {
y = x.cuda(1)
print(y.place) # CUDAPlace(1)
)DOC")
.def("_share_memory",
[](const std::shared_ptr<imperative::VarBase> &self) {
#ifndef _WIN32
PADDLE_ENFORCE_EQ(
platform::is_cpu_place(self->Place()), true,
platform::errors::InvalidArgument(
"Sharing memory only support CPU Tensor currently"));
// 1. get LoDTensor
auto *t = self->MutableVar()->GetMutable<framework::LoDTensor>();
// 2. allocate shared memory
void *data_ptr = t->data<void>();
size_t data_size = t->numel() * framework::SizeOfType(t->type());
auto shared_writer_holder =
memory::allocation::AllocateMemoryMapWriterAllocation(
data_size);
// 3. maintain mmap fd set & backup ipc_name
const std::string &ipc_name = shared_writer_holder->ipc_name();
memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
// 4. copy data & reset holder
memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(),
platform::CPUPlace(), data_ptr, data_size);
t->ResetHolder(shared_writer_holder);
return *t;
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Sharing memory in Windows OS is not supported currently"));
#endif
},
py::return_value_policy::reference)
.def("copy_", &imperative::VarBase::CopyFrom)
.def("_copy_to",
[](const std::shared_ptr<imperative::VarBase> &self,
......
......@@ -279,6 +279,7 @@ if avx_supported():
from .core_avx import _set_process_signal_handler
from .core_avx import _throw_error_if_process_failed
from .core_avx import _convert_to_tensor_list
from .core_avx import _array_to_share_memory_tensor
from .core_avx import _cleanup_mmap_fds
from .core_avx import _remove_tensor_list_mmap_fds
except Exception as e:
......@@ -333,6 +334,7 @@ if load_noavx:
from .core_noavx import _set_process_signal_handler
from .core_noavx import _throw_error_if_process_failed
from .core_noavx import _convert_to_tensor_list
from .core_noavx import _array_to_share_memory_tensor
from .core_noavx import _cleanup_mmap_fds
from .core_noavx import _remove_tensor_list_mmap_fds
except Exception as e:
......
......@@ -166,7 +166,9 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
# pack as LoDTensorArray
array = core.LoDTensorArray()
for slot in batch:
if not isinstance(slot, core.LoDTensor):
if isinstance(slot, paddle.Tensor):
slot = slot.value().get_tensor()
elif not isinstance(slot, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(slot, core.CPUPlace())
slot = tmp
......@@ -388,7 +390,9 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
# LoDTensor not in shared memory is not
# serializable, cannot be create in workers
for slot in batch:
if not isinstance(slot, core.LoDTensor):
if isinstance(slot, paddle.Tensor):
slot = slot.value().get_tensor()
elif not isinstance(slot, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(slot, core.CPUPlace())
slot = tmp
......
......@@ -36,14 +36,10 @@ def _flatten_batch(batch):
def _flatten(batch, flat_batch, structure, field_idx):
if isinstance(batch, Sequence):
for field in batch:
if isinstance(field, np.ndarray):
if isinstance(field, (np.ndarray, paddle.Tensor)):
structure.append('{}{}'.format(FIELD_PREFIX, field_idx))
flat_batch.append(field)
field_idx += 1
elif isinstance(field, paddle.Tensor):
structure.append('{}{}'.format(FIELD_PREFIX, field_idx))
flat_batch.append(field.numpy())
field_idx += 1
elif isinstance(field, (str, bytes, numbers.Number)):
structure.append(field)
elif isinstance(field, Sequence):
......@@ -58,14 +54,10 @@ def _flatten_batch(batch):
structure.append(field)
elif isinstance(batch, Mapping):
for k, field in batch.items():
if isinstance(field, np.ndarray):
if isinstance(field, (np.ndarray, paddle.Tensor)):
structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx)
flat_batch.append(field)
field_idx += 1
elif isinstance(field, paddle.Tensor):
structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx)
flat_batch.append(field.numpy())
field_idx += 1
elif isinstance(field, (str, bytes, numbers.Number)):
structure[k] = field
elif isinstance(field, Sequence):
......
......@@ -238,7 +238,11 @@ def _worker_loop(dataset, dataset_kind, indices_queue, out_queue, done_event,
out_queue.put((idx, batch, None))
batch, structure = _flatten_batch(batch)
if use_shared_memory:
tensor_list = core._convert_to_tensor_list(batch)
tensor_list = [
core._array_to_share_memory_tensor(b)
if isinstance(b, np.ndarray) else b._share_memory()
for b in batch
]
out_queue.put((idx, tensor_list, structure))
core._remove_tensor_list_mmap_fds(tensor_list)
else:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册