未验证 提交 7d8d5734 编写于 作者: C Chen Weihang 提交者: GitHub

Speed up dygraph DataLoader based on shared memory and LoDTensor serialization (#22541)

* add lodtensor share memory & serialization, test=develop

* fix windows compile error, test=develop

* deal vartype pickle & fix unittest matching error message, test=develop

* update timeout variable name, test=develop

* refactor memory map implement, test=develop

* clear mmap file discripter when exit unexpectedly, test=develop

* remove the child process fd in advance, test=develop

* remove mmap fds after Queue.put in child process, test=develop

* add hard unittests for register exit func, test=develop

* fix python2 compatibility problem in unittest, test=develop

* fix exception unittest error, test=develop

* polish code based review comment, test=develop
上级 324f2b39
...@@ -66,3 +66,8 @@ cc_test(allocator_facade_frac_flags_test SRCS allocator_facade_frac_flags_test.c ...@@ -66,3 +66,8 @@ cc_test(allocator_facade_frac_flags_test SRCS allocator_facade_frac_flags_test.c
cc_library(auto_growth_best_fit_allocator SRCS auto_growth_best_fit_allocator.cc DEPS allocator aligned_allocator) cc_library(auto_growth_best_fit_allocator SRCS auto_growth_best_fit_allocator.cc DEPS allocator aligned_allocator)
cc_test(auto_growth_best_fit_allocator_facade_test SRCS auto_growth_best_fit_allocator_facade_test.cc DEPS cpu_allocator auto_growth_best_fit_allocator) cc_test(auto_growth_best_fit_allocator_facade_test SRCS auto_growth_best_fit_allocator_facade_test.cc DEPS cpu_allocator auto_growth_best_fit_allocator)
cc_test(auto_growth_best_fit_allocator_test SRCS auto_growth_best_fit_allocator_test.cc DEPS auto_growth_best_fit_allocator) cc_test(auto_growth_best_fit_allocator_test SRCS auto_growth_best_fit_allocator_test.cc DEPS auto_growth_best_fit_allocator)
if(NOT WIN32)
cc_library(mmap_allocator SRCS mmap_allocator.cc DEPS allocator)
cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator)
endif(NOT WIN32)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _WIN32
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
#include <fcntl.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <random>
#include <string>
#include <utility>
namespace paddle {
namespace memory {
namespace allocation {
MemoryMapWriterAllocation::~MemoryMapWriterAllocation() {
PADDLE_ENFORCE_NE(
munmap(this->ptr(), this->size()), -1,
platform::errors::Unavailable("could not unmap the shared memory file %s",
this->ipc_name()));
}
MemoryMapReaderAllocation::~MemoryMapReaderAllocation() {
PADDLE_ENFORCE_NE(
munmap(this->ptr(), this->size()), -1,
platform::errors::Unavailable("could not unmap the shared memory file %s",
this->ipc_name()));
/* Here we do not pay attention to the result of shm_unlink,
because the memory mapped file may have been cleared due to the
MemoryMapFdSet::Clear() */
shm_unlink(this->ipc_name().c_str());
MemoryMapFdSet::Instance().Remove(this->ipc_name());
VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name();
}
std::string GetIPCName() {
static std::random_device rd;
std::string handle = "/paddle_";
#ifdef _WIN32
handle += std::to_string(GetCurrentProcessId());
#else
handle += std::to_string(getpid());
#endif
handle += "_";
handle += std::to_string(rd());
return std::move(handle);
}
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
size_t size) {
const std::string &ipc_name = GetIPCName();
int flags = O_RDWR | O_CREAT;
int fd = shm_open(ipc_name.c_str(), flags, 0644);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));
PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0,
platform::errors::Unavailable(
"Fruncate a file to a specified length failed!"));
void *ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
PADDLE_ENFORCE_NE(ptr, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when create shared memory."));
close(fd);
return std::make_shared<MemoryMapWriterAllocation>(ptr, size, ipc_name);
}
std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
const std::string &ipc_name, size_t size) {
int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));
void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
PADDLE_ENFORCE_NE(ptr, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when rebuild shared memory."));
close(fd);
return std::make_shared<MemoryMapReaderAllocation>(ptr, size, ipc_name);
}
MemoryMapFdSet &MemoryMapFdSet::Instance() { // NOLINT
static MemoryMapFdSet set;
return set;
}
void MemoryMapFdSet::Insert(const std::string &ipc_name) {
std::lock_guard<std::mutex> guard(mtx_);
fd_set_.emplace(ipc_name);
VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: insert " << ipc_name
<< ", set size: " << fd_set_.size();
}
void MemoryMapFdSet::Remove(const std::string &ipc_name) {
std::lock_guard<std::mutex> guard(mtx_);
fd_set_.erase(ipc_name);
VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: erase " << ipc_name
<< ", set size: " << fd_set_.size();
}
void MemoryMapFdSet::Clear() {
VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: set size - "
<< fd_set_.size();
std::lock_guard<std::mutex> guard(mtx_);
for (auto fd : fd_set_) {
int rlt = shm_unlink(fd.c_str());
if (rlt == 0) {
VLOG(3) << "PID: " << getpid() << ", MemoryMapFdSet: clear " << fd;
}
}
fd_set_.clear();
}
MemoryMapFdSet::~MemoryMapFdSet() { Clear(); }
} // namespace allocation
} // namespace memory
} // namespace paddle
#endif
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifndef _WIN32
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_set>
#include <utility>
#include "paddle/fluid/memory/allocation/allocator.h"
namespace paddle {
namespace memory {
namespace allocation {
class MemoryMapWriterAllocation : public Allocation {
public:
explicit MemoryMapWriterAllocation(void *ptr, size_t size,
std::string ipc_name)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)) {}
inline const std::string &ipc_name() const { return ipc_name_; }
~MemoryMapWriterAllocation() override;
private:
std::string ipc_name_;
};
class MemoryMapReaderAllocation : public Allocation {
public:
explicit MemoryMapReaderAllocation(void *ptr, size_t size,
std::string ipc_name)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)) {}
inline const std::string &ipc_name() const { return ipc_name_; }
~MemoryMapReaderAllocation() override;
private:
std::string ipc_name_;
};
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
size_t size);
std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
const std::string &ipc_name, size_t size);
class MemoryMapFdSet {
public:
static MemoryMapFdSet &Instance(); // NOLINT
void Insert(const std::string &ipc_name);
void Remove(const std::string &ipc_name);
void Clear();
~MemoryMapFdSet();
private:
MemoryMapFdSet() = default;
std::unordered_set<std::string> fd_set_;
std::mutex mtx_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
#endif
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _WIN32
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
#include <sys/types.h>
#include "gtest/gtest.h"
namespace paddle {
namespace memory {
namespace allocation {
TEST(MemoryMapAllocation, test_allocation_base) {
size_t data_size = 4UL * 1024;
// 1. allocate writer holader
auto mmap_writer_holder = AllocateMemoryMapWriterAllocation(data_size);
std::string ipc_name = mmap_writer_holder->ipc_name();
// 2. write data
auto* writer_ptr = static_cast<int32_t*>(mmap_writer_holder->ptr());
for (int32_t i = 0; i < 1024; ++i) {
writer_ptr[i] = i;
}
// 3. create child process
pid_t fpid = fork();
if (fpid == 0) {
// 4. rebuild reader holder
auto mmap_reader_holder =
RebuildMemoryMapReaderAllocation(ipc_name, data_size);
auto* reader_ptr = static_cast<int32_t*>(mmap_reader_holder->ptr());
for (int32_t i = 0; i < 1024; ++i) {
ASSERT_EQ(reader_ptr[i], i);
}
}
}
} // namespace allocation
} // namespace memory
} // namespace paddle
#endif
...@@ -9,6 +9,7 @@ endif() ...@@ -9,6 +9,7 @@ endif()
if(NOT WIN32) if(NOT WIN32)
set(PYBIND_DEPS ${PYBIND_DEPS} data_loader) set(PYBIND_DEPS ${PYBIND_DEPS} data_loader)
set(PYBIND_DEPS ${PYBIND_DEPS} mmap_allocator)
if (WITH_NCCL) if (WITH_NCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context) set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context)
endif() endif()
......
...@@ -31,6 +31,7 @@ limitations under the License. */ ...@@ -31,6 +31,7 @@ limitations under the License. */
#include "paddle/fluid/imperative/profiler.h" #include "paddle/fluid/imperative/profiler.h"
#include "paddle/fluid/imperative/tracer.h" #include "paddle/fluid/imperative/tracer.h"
#include "paddle/fluid/imperative/type_defs.h" #include "paddle/fluid/imperative/type_defs.h"
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
#include "paddle/fluid/pybind/op_function.h" #include "paddle/fluid/pybind/op_function.h"
#include "paddle/fluid/pybind/pybind_boost_headers.h" #include "paddle/fluid/pybind/pybind_boost_headers.h"
#include "paddle/fluid/pybind/tensor_py.h" #include "paddle/fluid/pybind/tensor_py.h"
...@@ -220,6 +221,85 @@ void BindImperative(py::module *m_ptr) { ...@@ -220,6 +221,85 @@ void BindImperative(py::module *m_ptr) {
BindOpFunctions(&m); BindOpFunctions(&m);
#ifndef _WIN32
// Dygraph DataLoader signal handler
m.def("_set_process_pid", [](int64_t key, pid_t pid) {
imperative::SetLoadProcessPID(key, pid);
});
m.def("_erase_process_pid",
[](int64_t key) { imperative::EraseLoadProcessPID(key); });
m.def("_set_process_signal_handler",
[]() { imperative::SetLoadProcessSignalHandler(); });
m.def("_throw_error_if_process_failed",
[]() { imperative::ThrowErrorIfLoadProcessFailed(); });
// Dygraph DataLoader reader process & thread related functions
m.def(
"_convert_to_tensor_list",
[](py::object &obj) -> py::list {
// 0. input data check
PADDLE_ENFORCE(
py::isinstance<py::tuple>(obj) || py::isinstance<py::list>(obj),
platform::errors::InvalidArgument(
"The batch data read into DataLoader is illegal."
"Expected data type is tuple or list, but received %s",
obj.get_type()));
py::list batch = py::cast<py::list>(obj);
py::list tensors;
for (size_t i = 0; i < batch.size(); ++i) {
// 1. cast to python array
auto array = batch[i].cast<py::array>();
PADDLE_ENFORCE_NE(
string::Sprintf("%s", array.dtype()).compare("object"), 0,
platform::errors::InvalidArgument(
"Faild to convert input data to a regular ndarray.\n * "
"Usually this means the input data contains nested "
"lists with different lengths.\n * Check the reader "
"function passed to 'set_(sample/sample_list/batch)"
"_generator' to locate the data causes this issue."));
// 2. construcct LoDTensor
framework::LoDTensor t;
SetTensorFromPyArray<platform::CPUPlace>(&t, array,
platform::CPUPlace(), true);
// 3. allocate shared memory
void *data_ptr = t.data<void>();
size_t data_size = t.numel() * framework::SizeOfType(t.type());
auto shared_writer_holder =
memory::allocation::AllocateMemoryMapWriterAllocation(data_size);
// 4. maintain mmap fd set & backup ipc_name
const std::string &ipc_name = shared_writer_holder->ipc_name();
memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
// 5. copy data & reset holder
memory::Copy(platform::CPUPlace(), shared_writer_holder->ptr(),
platform::CPUPlace(), data_ptr, data_size);
t.ResetHolder(shared_writer_holder);
// 6. append to result list
tensors.append(t);
}
return tensors;
},
py::return_value_policy::take_ownership);
m.def("_remove_tensor_list_mmap_fds", [](py::list &tensor_list) {
for (size_t i = 0; i < tensor_list.size(); ++i) {
auto t = tensor_list[i].cast<framework::LoDTensor>();
auto *mmap_writer_allocation =
dynamic_cast<memory::allocation::MemoryMapWriterAllocation *>(
t.Holder().get());
PADDLE_ENFORCE_NOT_NULL(
mmap_writer_allocation,
platform::errors::NotFound("The shared memory of LoDTensor in "
"DataLoader's child process has been "
"released."));
memory::allocation::MemoryMapFdSet::Instance().Remove(
mmap_writer_allocation->ipc_name());
}
});
m.def("_cleanup_mmap_fds",
[]() { memory::allocation::MemoryMapFdSet::Instance().Clear(); });
#endif
py::class_<imperative::detail::BackwardStrategy> backward_strategy( py::class_<imperative::detail::BackwardStrategy> backward_strategy(
m, "BackwardStrategy", R"DOC( m, "BackwardStrategy", R"DOC(
...@@ -277,19 +357,6 @@ void BindImperative(py::module *m_ptr) { ...@@ -277,19 +357,6 @@ void BindImperative(py::module *m_ptr) {
imperative::SetCurrentTracer(tracer); imperative::SetCurrentTracer(tracer);
}); });
#ifndef _WIN32
// Dygraph DataLoader signal handler
m.def("_set_process_pid", [](int64_t key, pid_t pid) {
imperative::SetLoadProcessPID(key, pid);
});
m.def("_erase_process_pid",
[](int64_t key) { imperative::EraseLoadProcessPID(key); });
m.def("_set_process_signal_handler",
[]() { imperative::SetLoadProcessSignalHandler(); });
m.def("_throw_error_if_process_failed",
[]() { imperative::ThrowErrorIfLoadProcessFailed(); });
#endif
py::class_<imperative::VarBase, std::shared_ptr<imperative::VarBase>>( py::class_<imperative::VarBase, std::shared_ptr<imperative::VarBase>>(
m, "VarBase", m, "VarBase",
R"DOC()DOC") R"DOC()DOC")
......
...@@ -47,6 +47,7 @@ limitations under the License. */ ...@@ -47,6 +47,7 @@ limitations under the License. */
#include "paddle/fluid/framework/version.h" #include "paddle/fluid/framework/version.h"
#include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/imperative/layer.h"
#include "paddle/fluid/memory/allocation/allocator_strategy.h" #include "paddle/fluid/memory/allocation/allocator_strategy.h"
#include "paddle/fluid/memory/allocation/mmap_allocator.h"
#include "paddle/fluid/operators/activation_op.h" #include "paddle/fluid/operators/activation_op.h"
#include "paddle/fluid/operators/py_func_op.h" #include "paddle/fluid/operators/py_func_op.h"
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
...@@ -860,7 +861,58 @@ PYBIND11_MODULE(core_noavx, m) { ...@@ -860,7 +861,58 @@ PYBIND11_MODULE(core_noavx, m) {
} }
dst.set_lod(self.lod()); dst.set_lod(self.lod());
return dst; return dst;
#ifdef _WIN32
}); });
#else
})
.def(py::pickle(
[](const LoDTensor &t) { // __getstate__
auto holder = t.Holder();
PADDLE_ENFORCE_EQ(
platform::is_cpu_place(holder->place()), true,
platform::errors::PreconditionNotMet(
"LoDTensor is not on CPU."
"Now only LoDTensor on CPU can be serialized."));
auto* mmap_writer_allocation =
dynamic_cast<memory::allocation::MemoryMapWriterAllocation *>(
holder.get());
PADDLE_ENFORCE_NOT_NULL(mmap_writer_allocation,
platform::errors::PreconditionNotMet(
"LoDTensor is not in shared memory."
"Now only LoDTensor on shared memory can be serialized."));
int type_idx = static_cast<int>(t.type());
return py::make_tuple(mmap_writer_allocation->ipc_name(),
mmap_writer_allocation->size(),
type_idx, vectorize(t.dims()), t.lod());
},
[](py::tuple t) { // __setstate__
if (t.size() != 5)
throw std::runtime_error("Invalid LoDTensor state!");
// 1. Create a new C++ instance
LoDTensor tensor;
// 2. Rebuild Allocation
const std::string &ipc_name = t[0].cast<std::string>();
size_t size = t[1].cast<size_t>();
auto shared_reader_holder =
memory::allocation::RebuildMemoryMapReaderAllocation(
ipc_name, size);
// 3. Maintain global fd set
VLOG(3) << "LoDTensor ipc name: " << ipc_name;
memory::allocation::MemoryMapFdSet::Instance().Insert(ipc_name);
// 4. Rebuild LoDTensor
tensor.ResetHolderWithType(shared_reader_holder,
static_cast<proto::VarType::Type>(t[2].cast<int>()));
tensor.Resize(make_ddim(t[3].cast<std::vector<int>>()));
tensor.set_lod(t[4].cast<framework::LoD>());
return tensor;
}));
#endif
py::class_<SelectedRows>(m, "SelectedRows") py::class_<SelectedRows>(m, "SelectedRows")
.def("__init__", .def("__init__",
......
...@@ -189,6 +189,9 @@ if avx_supported(): ...@@ -189,6 +189,9 @@ if avx_supported():
from .core_avx import _erase_process_pid from .core_avx import _erase_process_pid
from .core_avx import _set_process_signal_handler from .core_avx import _set_process_signal_handler
from .core_avx import _throw_error_if_process_failed from .core_avx import _throw_error_if_process_failed
from .core_avx import _convert_to_tensor_list
from .core_avx import _cleanup_mmap_fds
from .core_avx import _remove_tensor_list_mmap_fds
except Exception as e: except Exception as e:
if has_avx_core: if has_avx_core:
raise e raise e
...@@ -230,6 +233,9 @@ if load_noavx: ...@@ -230,6 +233,9 @@ if load_noavx:
from .core_noavx import _erase_process_pid from .core_noavx import _erase_process_pid
from .core_noavx import _set_process_signal_handler from .core_noavx import _set_process_signal_handler
from .core_noavx import _throw_error_if_process_failed from .core_noavx import _throw_error_if_process_failed
from .core_noavx import _convert_to_tensor_list
from .core_noavx import _cleanup_mmap_fds
from .core_noavx import _remove_tensor_list_mmap_fds
except Exception as e: except Exception as e:
if has_noavx_core: if has_noavx_core:
sys.stderr.write( sys.stderr.write(
......
...@@ -27,6 +27,8 @@ import logging ...@@ -27,6 +27,8 @@ import logging
from .dataset import DatasetBase, InMemoryDataset from .dataset import DatasetBase, InMemoryDataset
### Dygraph DataLoader configs ### ### Dygraph DataLoader configs ###
import atexit
import os
import multiprocessing import multiprocessing
import signal import signal
# NOTE: queue has a different name in python2 and python3 # NOTE: queue has a different name in python2 and python3
...@@ -34,9 +36,13 @@ if sys.version_info[0] == 2: ...@@ -34,9 +36,13 @@ if sys.version_info[0] == 2:
import Queue as queue import Queue as queue
else: else:
import queue import queue
# NOTE: [ avoid hanging ] These value is used in getting data from another process # NOTE: [ avoid hanging & failed quickly ] These value is used in getting data from another process
QUEUE_GET_TIMEOUT = 5 QUEUE_GET_TIMEOUT = 60
MAX_GET_FAILED_TIME = 12
# NOTE: [ mmap files clear ] If there is still data in the multiprocess queue when the main process finishes reading,
# the data in the queue needs to be popped. Then the LoDTensor read by the main process
# from the child process will automatically clear the memory-mapped file.
multiprocess_queue_set = set()
__all__ = ['PyReader', 'DataLoader'] __all__ = ['PyReader', 'DataLoader']
...@@ -58,6 +64,82 @@ def _convert_places(places): ...@@ -58,6 +64,82 @@ def _convert_places(places):
return ret return ret
def _clear_multiprocess_queue_set():
global multiprocess_queue_set
for data_queue in multiprocess_queue_set:
while True:
try:
data_queue.get_nowait()
except queue.Empty:
break
# NOTE: main process clear function at exit
def _cleanup():
# NOTE: inter-process Queue shared memory objects clear function
_clear_multiprocess_queue_set()
# NOTE: main process memory map files clear funciton
core._cleanup_mmap_fds()
# NOTE used for register a function to be executed at interpreter exit.
class CleanupFuncRegistrar():
# Record the cleanup functions that have been executed
_executed_func_set = set()
# Record the cleanup functions that have been registered
_registered_func_set = set()
@classmethod
def register(cls, function, signals=[signal.SIGTERM]):
def _func_exectuor():
if function not in cls._executed_func_set:
try:
function()
finally:
cls._executed_func_set.add(function)
def _func_register(function):
if not callable(function):
raise TypeError("%s is not callable object." % (function))
# check function object whether hash-able
set([function])
if function not in cls._registered_func_set:
atexit.register(_func_exectuor)
cls._registered_func_set.add(function)
def _signal_handler(signum=None, frame=None):
_func_exectuor()
if signum is not None:
if signum == signal.SIGINT:
raise KeyboardInterrupt
sys.exit(signum)
def _signal_register(signals):
signals = set(signals)
for sig in signals:
orig_handler = signal.signal(sig, _signal_handler)
if orig_handler not in (signal.SIG_DFL, signal.SIG_IGN):
if (sig == signal.SIGINT and
orig_handler is signal.default_int_handler):
continue
if orig_handler not in cls._registered_func_set:
atexit.register(orig_handler)
cls._registered_func_set.add(orig_handler)
# deal with signals
_signal_register(signals)
# deal with function
_func_register(function)
# NOTE: [ mmap files clear ] When the main process exits unexpectedly, the remaining
# shared memory objects in the inter-process Queue and the main process (mostly in the
# BlockingQueue) may not be completely released, resulting in the corresponding
# memory-mapped file remaining on the disk (/dev/shm), so register this function
# to clean up shared memory objects in these two queues before the python interpreter exits.
CleanupFuncRegistrar.register(_cleanup)
class DataLoaderBase(object): class DataLoaderBase(object):
def __init__(self): def __init__(self):
self._places = None self._places = None
...@@ -383,6 +465,16 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -383,6 +465,16 @@ class DygraphGeneratorLoader(DataLoaderBase):
def iterable(self): def iterable(self):
return self._iterable return self._iterable
def _clear_and_remove_data_queue(self):
if self._data_queue is not None:
while True:
try:
self._data_queue.get_nowait()
except queue.Empty:
break
global multiprocess_queue_set
multiprocess_queue_set.remove(self._data_queue)
def _wait_thread_ends(self): def _wait_thread_ends(self):
thread = self._thread thread = self._thread
if thread is not None: if thread is not None:
...@@ -392,12 +484,25 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -392,12 +484,25 @@ class DygraphGeneratorLoader(DataLoaderBase):
def _wait_process_ends(self): def _wait_process_ends(self):
process = self._process process = self._process
if process is not None: if process is not None:
self._data_queue.cancel_join_thread()
self._data_queue.close()
process.join() process.join()
# erase process id # erase process id
core._erase_process_pid(id(self)) core._erase_process_pid(id(self))
def _set_child_signal_handler(self):
core._set_process_pid(id(self), self._process.pid)
current_handler = signal.getsignal(signal.SIGCHLD)
if not callable(current_handler):
current_handler = None
def __handler__(signum, frame):
# NOTE: Here the signum is SIGDHLD, when the child process exits, this handler
# will be called whenever the child process exits normally or abnormally.
core._throw_error_if_process_failed()
if current_handler is not None:
current_handler(signum, frame)
signal.signal(signal.SIGCHLD, __handler__)
def _init_iterable(self): def _init_iterable(self):
self._wait_thread_ends() self._wait_thread_ends()
if self._use_multiprocess: if self._use_multiprocess:
...@@ -414,8 +519,13 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -414,8 +519,13 @@ class DygraphGeneratorLoader(DataLoaderBase):
def _start(self): def _start(self):
if self._use_multiprocess: if self._use_multiprocess:
# Set data_queue and process # clear old _data_queue and remove it from multiprocess_queue_set
self._clear_and_remove_data_queue()
# set data_queue and process
self._data_queue = multiprocessing.Queue(self._capacity) self._data_queue = multiprocessing.Queue(self._capacity)
# add _data_queue into global queue set
global multiprocess_queue_set
multiprocess_queue_set.add(self._data_queue)
self._process = multiprocessing.Process( self._process = multiprocessing.Process(
target=self._reader_process_loop) target=self._reader_process_loop)
self._process.daemon = True self._process.daemon = True
...@@ -473,28 +583,13 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -473,28 +583,13 @@ class DygraphGeneratorLoader(DataLoaderBase):
" to locate the data causes this issue.\n\t* Please consider using " " to locate the data causes this issue.\n\t* Please consider using "
"'fluid.create_lod_tensor' to convert it to a LoD-Tensor.") "'fluid.create_lod_tensor' to convert it to a LoD-Tensor.")
def _set_child_signal_handler(self):
core._set_process_pid(id(self), self._process.pid)
current_handler = signal.getsignal(signal.SIGCHLD)
if not callable(current_handler):
current_handler = None
def __handler__(signum, frame):
core._throw_error_if_process_failed()
if current_handler is not None:
current_handler(signum, frame)
signal.signal(signal.SIGCHLD, __handler__)
def _exit_thread_expectedly(self): def _exit_thread_expectedly(self):
self._thread_done_event.set() self._thread_done_event.set()
self._blocking_queue.close() self._blocking_queue.close()
self._data_queue.close()
def _exit_thread_unexpectedly(self): def _exit_thread_unexpectedly(self):
self._thread_done_event.set() self._thread_done_event.set()
self._blocking_queue.kill() self._blocking_queue.kill()
self._data_queue.close()
logging.error("DataLoader reader thread raised an exception!") logging.error("DataLoader reader thread raised an exception!")
def _reader_process_loop(self): def _reader_process_loop(self):
...@@ -502,23 +597,29 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -502,23 +597,29 @@ class DygraphGeneratorLoader(DataLoaderBase):
# set signal handler # set signal handler
core._set_process_signal_handler() core._set_process_signal_handler()
for sample in self._batch_reader(): # child process clear function at exit
if sample is None: def _cleanup():
raise ValueError( # clear memory map files in child process
"Sample in reader is None. Please check whether your dataset is valid." core._cleanup_mmap_fds()
)
self._data_queue.put(sample) # NOTE: [ mmap files clear ] When the child process exits unexpectedly,
# some shared memory objects may have been applied for but have not yet
# been put into the inter-process Queue. This part of the object needs
# to be cleaned up when the process ends.
CleanupFuncRegistrar.register(_cleanup)
for batch in self._batch_reader():
tensor_list = core._convert_to_tensor_list(batch)
self._data_queue.put(tensor_list)
core._remove_tensor_list_mmap_fds(tensor_list)
self._data_queue.put(None) self._data_queue.put(None)
except KeyboardInterrupt: except KeyboardInterrupt:
# NOTE: Main process will raise KeyboardInterrupt anyways, ignore it in child process # NOTE: Main process will raise KeyboardInterrupt anyways, ignore it in child process
pass pass
except: except:
self._data_queue.cancel_join_thread()
self._data_queue.close()
six.reraise(*sys.exc_info()) six.reraise(*sys.exc_info())
def _reader_thread_loop_with_process(self): def _reader_thread_loop_with_process(self):
get_sample_try_time = 0
while not self._thread_done_event.is_set(): while not self._thread_done_event.is_set():
try: try:
# NOTE: [ avoid hanging ] Even with carefully designed data dependencies # NOTE: [ avoid hanging ] Even with carefully designed data dependencies
...@@ -526,33 +627,25 @@ class DygraphGeneratorLoader(DataLoaderBase): ...@@ -526,33 +627,25 @@ class DygraphGeneratorLoader(DataLoaderBase):
# still happen when data in queue is corrupted (e.g., due to # still happen when data in queue is corrupted (e.g., due to
# Queue.cancel_join_thread or unexpected exit). So we set a timeout whenever # Queue.cancel_join_thread or unexpected exit). So we set a timeout whenever
# we try to get data from `data_queue` # we try to get data from `data_queue`
sample = self._data_queue.get(timeout=QUEUE_GET_TIMEOUT) # NOTE: [ avoid failed quickly ] Here, the time setting of QUEUE_GET_TIMEOUT
get_sample_try_time = 0 # is relatively long, currently it is 60 seconds, because in some models,
# if the reader child process starts with a heavy burden, the child process
# has no enough time to put the data in the queue when the main process
# start trying to get data from queue. At this time, the child thread needs
# to wait slightly longer
tensor_list = self._data_queue.get(timeout=QUEUE_GET_TIMEOUT)
except queue.Empty: except queue.Empty:
get_sample_try_time += 1
if get_sample_try_time > MAX_GET_FAILED_TIME:
self._exit_thread_unexpectedly() self._exit_thread_unexpectedly()
raise RuntimeError( raise RuntimeError(
"DataLoader reader thread has not read data for a long time (60s)." "DataLoader reader thread has not read data for a long time (60s)."
) )
else:
# NOTE: [ avoid failed quickly ] Sometimes if the reader child process has a heavy burden,
# the child process has no enough time to put the data in the queue when the main process
# start trying to get data from queue. At this time, failure to read data should not be
# counted as a fatal error, there should be a certain number of attempts.
continue
if not self._thread_done_event.is_set(): if not self._thread_done_event.is_set():
if sample is not None: if tensor_list is not None:
try: try:
array = core.LoDTensorArray() array = core.LoDTensorArray()
for item in sample: for tensor in tensor_list:
if not isinstance(item, core.LoDTensor): array.append(tensor)
self._check_input_array(item)
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp
array.append(item)
if not self._blocking_queue.push(array): if not self._blocking_queue.push(array):
self._blocking_queue.close() self._blocking_queue.close()
except: except:
......
...@@ -200,6 +200,8 @@ if (APPLE OR WIN32) ...@@ -200,6 +200,8 @@ if (APPLE OR WIN32)
list(REMOVE_ITEM TEST_OPS test_imperative_data_loader) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader)
list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exception) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exception)
list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_process) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_process)
list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_fds_clear)
list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exit_func)
list(REMOVE_ITEM TEST_OPS test_imperative_signal_handler) list(REMOVE_ITEM TEST_OPS test_imperative_signal_handler)
endif() endif()
......
...@@ -44,7 +44,7 @@ def batch_generator_creator(batch_size, batch_num): ...@@ -44,7 +44,7 @@ def batch_generator_creator(batch_size, batch_num):
return __reader__ return __reader__
class TestDygraphhDataLoader(unittest.TestCase): class TestDygraphDataLoader(unittest.TestCase):
def setUp(self): def setUp(self):
self.batch_size = 8 self.batch_size = 8
self.batch_num = 4 self.batch_num = 4
......
...@@ -27,7 +27,7 @@ def get_random_images_and_labels(image_shape, label_shape): ...@@ -27,7 +27,7 @@ def get_random_images_and_labels(image_shape, label_shape):
return image, label return image, label
class TestDygraphhDataLoaderWithException(unittest.TestCase): class TestDygraphDataLoaderWithException(unittest.TestCase):
def setUp(self): def setUp(self):
self.batch_size = 8 self.batch_size = 8
self.batch_num = 4 self.batch_num = 4
...@@ -63,7 +63,7 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase): ...@@ -63,7 +63,7 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase):
exception = ex exception = ex
self.assertIsNotNone(exception) self.assertIsNotNone(exception)
def test_multi_process_with_thread_expection(self): def test_multi_process_with_process_expection(self):
def error_sample_genarator(batch_num): def error_sample_genarator(batch_num):
def __reader__(): def __reader__():
for _ in range(batch_num): for _ in range(batch_num):
...@@ -81,8 +81,6 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase): ...@@ -81,8 +81,6 @@ class TestDygraphhDataLoaderWithException(unittest.TestCase):
for _ in loader(): for _ in loader():
print("test_multi_process_with_thread_expection") print("test_multi_process_with_thread_expection")
except core.EnforceNotMet as ex: except core.EnforceNotMet as ex:
self.assertIn("Blocking queue is killed",
cpt.get_exception_message(ex))
exception = ex exception = ex
self.assertIsNotNone(exception) self.assertIsNotNone(exception)
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import signal
import unittest
import multiprocessing
import time
import paddle.compat as cpt
if sys.version_info[0] == 2:
import Queue as queue
else:
import queue
from paddle.fluid.reader import multiprocess_queue_set, _cleanup, CleanupFuncRegistrar
# NOTE: These special functions cannot be detected by the existing coverage mechanism,
# so the following unittests are added for these internal functions.
class TestDygraphDataLoaderCleanUpFunc(unittest.TestCase):
def setUp(self):
self.capacity = 10
def test_clear_queue_set(self):
test_queue = queue.Queue(self.capacity)
global multiprocess_queue_set
multiprocess_queue_set.add(test_queue)
for i in range(0, self.capacity):
test_queue.put(i)
_cleanup()
class TestRegisterExitFunc(unittest.TestCase):
# This function does not need to be implemented in this case
def none_func(self):
pass
def test_not_callable_func(self):
exception = None
try:
CleanupFuncRegistrar.register(5)
except TypeError as ex:
self.assertIn("is not callable", cpt.get_exception_message(ex))
exception = ex
self.assertIsNotNone(exception)
def test_old_handler_for_sigint(self):
CleanupFuncRegistrar.register(
function=self.none_func, signals=[signal.SIGINT])
def test_signal_wrapper_by_sigchld(self):
# This function does not need to be implemented in this case
def __test_process__():
pass
CleanupFuncRegistrar.register(
function=self.none_func, signals=[signal.SIGCHLD])
exception = None
try:
test_process = multiprocessing.Process(target=__test_process__)
test_process.start()
time.sleep(3)
except SystemExit as ex:
exception = ex
self.assertIsNotNone(exception)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle.fluid as fluid
from paddle.fluid import core
def get_random_images_and_labels(image_shape, label_shape):
image = np.random.random(size=image_shape).astype('float32')
label = np.random.random(size=label_shape).astype('int64')
return image, label
def batch_generator_creator(batch_size, batch_num):
def __reader__():
for _ in range(batch_num):
batch_image, batch_label = get_random_images_and_labels(
[batch_size, 784], [batch_size, 1])
yield batch_image, batch_label
return __reader__
class TestDygraphDataLoaderMmapFdsClear(unittest.TestCase):
def setUp(self):
self.batch_size = 8
self.batch_num = 100
self.epoch_num = 2
self.capacity = 50
def prepare_data_loader(self):
loader = fluid.io.DataLoader.from_generator(
capacity=self.capacity, use_multiprocess=True)
loader.set_batch_generator(
batch_generator_creator(self.batch_size, self.batch_num),
places=fluid.CPUPlace())
return loader
def run_one_epoch_with_break(self, loader):
for step_id, data in enumerate(loader()):
image, label = data
relu = fluid.layers.relu(image)
self.assertEqual(image.shape, [self.batch_size, 784])
self.assertEqual(label.shape, [self.batch_size, 1])
self.assertEqual(relu.shape, [self.batch_size, 784])
if step_id == 30:
break
def test_data_loader_break(self):
with fluid.dygraph.guard():
loader = self.prepare_data_loader()
for _ in range(self.epoch_num):
self.run_one_epoch_with_break(loader)
break
def test_data_loader_continue_break(self):
with fluid.dygraph.guard():
loader = self.prepare_data_loader()
for _ in range(self.epoch_num):
self.run_one_epoch_with_break(loader)
if __name__ == '__main__':
unittest.main()
...@@ -16,6 +16,7 @@ import sys ...@@ -16,6 +16,7 @@ import sys
import unittest import unittest
import numpy as np import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core
if sys.version_info[0] == 2: if sys.version_info[0] == 2:
import Queue as queue import Queue as queue
...@@ -41,7 +42,7 @@ def batch_generator_creator(batch_size, batch_num): ...@@ -41,7 +42,7 @@ def batch_generator_creator(batch_size, batch_num):
# NOTE: coverage CI can't cover child process code, so need these test. # NOTE: coverage CI can't cover child process code, so need these test.
# Here test child process loop function in main process # Here test child process loop function in main process
class TestDygraphhDataLoaderProcess(unittest.TestCase): class TestDygraphDataLoaderProcess(unittest.TestCase):
def setUp(self): def setUp(self):
self.batch_size = 8 self.batch_size = 8
self.batch_num = 4 self.batch_num = 4
...@@ -77,7 +78,7 @@ class TestDygraphhDataLoaderProcess(unittest.TestCase): ...@@ -77,7 +78,7 @@ class TestDygraphhDataLoaderProcess(unittest.TestCase):
exception = None exception = None
try: try:
loader._reader_process_loop() loader._reader_process_loop()
except AttributeError as ex: except core.EnforceNotMet as ex:
exception = ex exception = ex
self.assertIsNotNone(exception) self.assertIsNotNone(exception)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册