diff --git a/paddle/fluid/imperative/CMakeLists.txt b/paddle/fluid/imperative/CMakeLists.txt index 148eaef38bf539412c9cce3b14dd33d39f999c5a..4250ba20eae5936527049957d259f19fb82743e6 100644 --- a/paddle/fluid/imperative/CMakeLists.txt +++ b/paddle/fluid/imperative/CMakeLists.txt @@ -10,6 +10,7 @@ cc_library(engine SRCS engine.cc DEPS layer gradient_accumulator) cc_library(imperative_profiler SRCS profiler.cc) if(NOT WIN32) cc_library(nccl_context SRCS nccl_context.cc DEPS device_context) + cc_library(data_loader SRCS data_loader.cc DEPS enforce) endif(NOT WIN32) add_subdirectory(tests) diff --git a/paddle/fluid/imperative/data_loader.cc b/paddle/fluid/imperative/data_loader.cc new file mode 100644 index 0000000000000000000000000000000000000000..5b20dfdf6d5290adbf756c09a51fd5f0cdf393df --- /dev/null +++ b/paddle/fluid/imperative/data_loader.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _WIN32 + +#include "paddle/fluid/imperative/data_loader.h" + +#include +#include + +#include +#include +#include + +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace imperative { + +static std::map load_process_pids; + +void SetLoadProcessPID(int64_t key, pid_t pid) { + VLOG(3) << "Dygraph Data Loader: set loader child process PID (" << key + << ", " << pid << ")"; + load_process_pids[key] = pid; +} + +void EraseLoadProcessPID(int64_t key) { + auto it = load_process_pids.find(key); + // Note: Can not find key also possible + if (it != load_process_pids.end()) { + VLOG(3) << "Dygraph Data Loader: erase loader child process PID (" << key + << ")"; + load_process_pids.erase(it); + } else { + VLOG(3) << "Dygraph Data Loader: The dygrph loader (id: " << key + << ") you want erase does not exist."; + } +} + +// sigaction doc: http://man7.org/linux/man-pages/man2/sigaction.2.html +// sigemptyset doc: https://linux.die.net/man/3/sigemptyset +// siginfo_t doc: https://www.mkssoftware.com/docs/man5/siginfo_t.5.asp +// waitid doc: https://linux.die.net/man/2/waitid + +#define SIGNAL_HANDLE(SIGNAL) \ + do { \ + struct sigaction sa; \ + sa.sa_handler = SIG_DFL; \ + sa.sa_flags = 0; \ + if (sigemptyset(&sa.sa_mask) != 0 || \ + sigaction(SIGNAL, &sa, nullptr) != 0) { \ + _exit(EXIT_FAILURE); \ + } else { \ + raise(SIGNAL); \ + } \ + } while (0) + +#define REGISTER_SIGNAL_HANDLER(SIGNAL, HANDLER_NAME) \ + static void HANDLER_NAME(int sig, siginfo_t *info, void *ctx) { \ + SIGNAL_HANDLE(SIGNAL); \ + } + +#define REGISTER_SPEC_SIGNAL_HANDLER(SIGNAL, HANDLER_NAME) \ + static void HANDLER_NAME(int sig, siginfo_t *info, void *ctx) { \ + if (info->si_pid == getppid()) { \ + _exit(EXIT_SUCCESS); \ + } \ + SIGNAL_HANDLE(SIGNAL); \ + } + +REGISTER_SIGNAL_HANDLER(SIGSEGV, SIGSEGV_handler); +REGISTER_SIGNAL_HANDLER(SIGBUS, SIGBUS_handler); +REGISTER_SPEC_SIGNAL_HANDLER(SIGTERM, SIGTERM_handler); + +static inline void setSignalHandler(int signal, + void (*handler)(int, siginfo_t *, void *), + struct sigaction *old_sa_ptr) { + struct sigaction sa; + sa.sa_sigaction = handler; + sa.sa_flags = SA_RESTART | SA_SIGINFO | SA_NOCLDSTOP | SA_NODEFER; + if (sigemptyset(&sa.sa_mask) != 0 || + sigaction(signal, &sa, old_sa_ptr) != 0) { + PADDLE_THROW(platform::errors::Fatal( + "An error occurred while setting handler for %s.", strsignal(signal))); + } +} + +// Note: maybe need to add other signal handler +void SetLoadProcessSignalHandler() { + setSignalHandler(SIGSEGV, &SIGSEGV_handler, nullptr); + setSignalHandler(SIGBUS, &SIGBUS_handler, nullptr); + setSignalHandler(SIGTERM, &SIGTERM_handler, nullptr); +} + +void ThrowErrorIfLoadProcessFailed() { + int error; + pid_t process_pid; + siginfo_t infop; + + for (auto &w : load_process_pids) { + process_pid = w.second; + // Use waitid rather than waitpid so that we can set NOWAIT, and that Python + // and other handlers can get whatever info they want about the child. + infop.si_pid = 0; + VLOG(3) << "Dygraph Data Loader: monitor loader child process " + << process_pid; + error = waitid(P_PID, process_pid, &infop, WEXITED | WNOHANG | WNOWAIT); + // ignore errors and case with no waitable child + if (error < 0 || infop.si_pid == 0) continue; + if (infop.si_code == CLD_EXITED && + infop.si_status != EXIT_SUCCESS) { // exit with error + PADDLE_THROW(platform::errors::Fatal( + "DataLoader process (pid %ld) exited unexpectedly with code %d. " + "Error detailed are lost due to multiprocessing. Rerunning with " + "DataLoader.from_generator(..., use_multiprocess=False) may give " + "better error trace.", + process_pid, infop.si_status)); + } else if (infop.si_code == CLD_KILLED || + infop.si_code == CLD_DUMPED) { // killed by signal + PADDLE_THROW(platform::errors::Fatal( + "DataLoader process (pid %ld) exited is killed by signal: %s.", + process_pid, strsignal(infop.si_status))); + } + } +} + +} // namespace imperative +} // namespace paddle + +#endif diff --git a/paddle/fluid/imperative/data_loader.h b/paddle/fluid/imperative/data_loader.h new file mode 100644 index 0000000000000000000000000000000000000000..99dce7a2e39d89d995d78fb533eae0110bf1aea7 --- /dev/null +++ b/paddle/fluid/imperative/data_loader.h @@ -0,0 +1,33 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifndef _WIN32 + +#include +#include + +namespace paddle { +namespace imperative { + +extern void SetLoadProcessPID(int64_t key, pid_t pid); +extern void EraseLoadProcessPID(int64_t key); +extern void SetLoadProcessSignalHandler(); +extern void ThrowErrorIfLoadProcessFailed(); + +} // namespace imperative +} // namespace paddle + +#endif diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 4482dca01e6c1abc33e82190a52a71fe038a454e..14dec536387f152e293a594900109bc39d2a6eae 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -3,7 +3,7 @@ set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapp analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context) if(NOT WIN32) - set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context) + set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context data_loader) endif(NOT WIN32) if(WITH_PYTHON) diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index b7c68610cc2b7003b87891366bda6b0933594cff..bbdafbf3d54f0c01aff215a7f721a71409c85bc4 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -25,6 +25,7 @@ limitations under the License. */ #include #include #include "paddle/fluid/imperative/backward_strategy.h" +#include "paddle/fluid/imperative/data_loader.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/imperative/nccl_context.h" #include "paddle/fluid/imperative/profiler.h" @@ -276,6 +277,19 @@ void BindImperative(py::module *m_ptr) { imperative::SetCurrentTracer(tracer); }); +#ifndef _WIN32 + // Dygraph DataLoader signal handler + m.def("_set_process_pid", [](int64_t key, pid_t pid) { + imperative::SetLoadProcessPID(key, pid); + }); + m.def("_erase_process_pid", + [](int64_t key) { imperative::EraseLoadProcessPID(key); }); + m.def("_set_process_signal_handler", + []() { imperative::SetLoadProcessSignalHandler(); }); + m.def("_throw_error_if_process_failed", + []() { imperative::ThrowErrorIfLoadProcessFailed(); }); +#endif + py::class_>( m, "VarBase", R"DOC()DOC") diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index 1bc69f74fdef014fe2ebee128fcf5a09ce7eeab4..067e733094e3a10f863d2279e7da198f111f509f 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -184,6 +184,11 @@ if avx_supported(): from .core_avx import _save_dygraph_dict from .core_avx import _load_dygraph_dict from .core_avx import _create_loaded_parameter + if sys.platform != 'win32': + from .core_avx import _set_process_pid + from .core_avx import _erase_process_pid + from .core_avx import _set_process_signal_handler + from .core_avx import _throw_error_if_process_failed except Exception as e: if has_avx_core: raise e @@ -220,6 +225,11 @@ if load_noavx: from .core_noavx import _save_dygraph_dict from .core_noavx import _load_dygraph_dict from .core_noavx import _create_loaded_parameter + if sys.platform != 'win32': + from .core_noavx import _set_process_pid + from .core_noavx import _erase_process_pid + from .core_noavx import _set_process_signal_handler + from .core_noavx import _throw_error_if_process_failed except Exception as e: if has_noavx_core: sys.stderr.write( diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index c298957a3fb7ce903c6e82d5ea67ca24ea762cfa..6e81e2bf83e9b0782be83435031b77648264b6de 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -12,10 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import core, dygraph +from . import core import sys import six -import warnings import numpy as np import threading import paddle @@ -27,6 +26,17 @@ from .unique_name import UniqueNameGenerator import logging from .dataset import DatasetBase, InMemoryDataset +### Dygraph DataLoader configs ### +import multiprocessing +import signal +# NOTE: queue has a different name in python2 and python3 +if sys.version_info[0] == 2: + import Queue as queue +else: + import queue +# NOTE: [ avoid hanging ] This value is used in getting data from another process +MP_CHECK_TIMEOUT = 10 + __all__ = ['PyReader', 'DataLoader'] data_loader_unique_name_generator = UniqueNameGenerator() @@ -76,7 +86,8 @@ class DataLoader(object): capacity=None, use_double_buffer=True, iterable=True, - return_list=False): + return_list=False, + use_multiprocess=False): """ Create a DataLoader object for loading data from Python generator. Data would be prefetched using Python thread and be pushed @@ -116,7 +127,12 @@ class DataLoader(object): the name of each feeded variables. If return_list=True, the return value on each device would be a list(LoDTensor). It is recommended to use return_list=False in static graph mode and - use return_list=True in dygraph mode. + use return_list=True in dygraph mode. + use_multiprocess (bool): whether to use multi-process to speed up + the data loading process in dygraph. Note: this parameter only + can be used in the dygraph mode. In the static graph mode, + whether this parameter is set or not has no effect. + The Default value is False. Returns: loader (DataLoader): the created DataLoader object. @@ -254,8 +270,13 @@ class DataLoader(object): assert label.shape == [BATCH_SIZE, 1] assert relu.shape == [BATCH_SIZE, 784] """ - return GeneratorLoader(feed_list, capacity, use_double_buffer, iterable, - return_list) + if in_dygraph_mode(): + return DygraphGeneratorLoader(feed_list, capacity, + use_double_buffer, iterable, + return_list, use_multiprocess) + else: + return GeneratorLoader(feed_list, capacity, use_double_buffer, + iterable, return_list) @staticmethod def from_dataset(dataset, places, drop_last=True): @@ -295,6 +316,298 @@ class DataLoader(object): return DatasetLoader(dataset, places, drop_last) +class DygraphGeneratorLoader(DataLoaderBase): + """ + The GeneratorLoader of dygraph + + The multiprocess dygraph GeneratorLoader's most functions are different from + static graph GeneratorLoader, Separate implementation to keep code readable. + """ + + def __init__(self, + feed_list=None, + capacity=None, + use_double_buffer=True, + iterable=True, + return_list=True, + use_multiprocess=False): + self._batch_reader = None + self._places = None + self._feed_list = feed_list + + if not capacity: + raise ValueError("Please give value to capacity.") + self._capacity = capacity + self._use_double_buffer = use_double_buffer + + if not iterable: + logging.warning( + "Please NOTE: dygraph can support iterable mode only. Change to iterable mode." + ) + self._iterable = True + if not return_list: + logging.warning( + "Please NOTE: dygraph can support return as list only. Change to return as list." + ) + self._return_list = True + + # NOTE: the multiprocessing in different platform is incompatible, we will solve it later + self._use_multiprocess = use_multiprocess + if self._use_multiprocess and (sys.platform == 'darwin' or + sys.platform == 'win32'): + logging.warning( + "NOTE: The multiprocess mode does not currently support MacOs and Windows." + ) + self._use_multiprocess = False + + if self._use_multiprocess: + # NOTE: the multiprocessing.Queue used to save loading data in self._process + self._data_queue = None + # NOTE: this process is used to load data asynchronously from self._batch_reader + self._process = None + + # NOTE: the C++ LoDTensorBlockingQueue instance + self._blocking_queue = None + # NOTE: 1. In multiprocess mode, this thread is used to get next batch data from + # self._data_queue, then push it into self._blocking_queue; 2. In singleprocess + # mode, this thread is used to get next batch data from self._batch_reader, then + # push it into self._blocking_queue + self._thread = None + + @property + def queue(self): + return self._blocking_queue + + @property + def iterable(self): + return self._iterable + + def _wait_thread_ends(self): + thread = self._thread + if thread is not None: + self._blocking_queue.close() + thread.join() + + def _wait_process_ends(self): + process = self._process + if process is not None: + self._data_queue.cancel_join_thread() + self._data_queue.close() + process.join() + # erase process id + core._erase_process_pid(id(self)) + + def _init_iterable(self): + self._wait_thread_ends() + if self._use_multiprocess: + self._wait_process_ends() + self._var_names = [] + self._shapes = [] + self._dtypes = [] + self._need_check_feed = [] + self._blocking_queue = core.init_lod_tensor_blocking_queue( + core.Variable(), self._capacity) + self._reader = core.create_py_reader( + self.queue, self._var_names, self._shapes, self._dtypes, + self._need_check_feed, self._places, self._use_double_buffer) + + def _start(self): + if self._use_multiprocess: + # Set data_queue and process + self._data_queue = multiprocessing.Queue(self._capacity) + self._process = multiprocessing.Process( + target=self._reader_process_loop) + self._process.daemon = True + self._process.start() + + # Set child process signal handler + # NOTE: [ avoiding hang ] 1. if the child process dies due to bus error/segfault + # or just hang, the main process will hang waiting for data, so here need to deal + # with SIGSEGV and SIGBUS of child process; 2. if the main process end before child + # process, it shuts the all its daemonic children down with a SIGTERM (instead of + # joining them without a timeout), so here nedd to deal with SIGTERM. + self._set_child_signal_handler() + + # Set reader_thread + self._thread_done_event = threading.Event() + self._thread = threading.Thread( + target=self._reader_thread_loop_with_process) + self._thread.daemon = True + self._thread.start() + else: + self._thread = threading.Thread(target=self._reader_thread_loop) + self._thread.daemon = True + self._thread.start() + + def _reset(self): + self._reader.reset() + self._wait_thread_ends() + if self._use_multiprocess: + self._wait_process_ends() + + def __iter__(self): + assert self.iterable, "DataLoader is not iterable" + assert self._batch_reader is not None, \ + "Data source of DataLoader has not set yet" + + self._init_iterable() + self._start() + return self + + def __next__(self): + try: + return self._reader.read_next_var_list() + except StopIteration: + self._reset() + six.reraise(*sys.exc_info()) + + @classmethod + def _check_input_array(cls, item): + arr = np.array(item) + if arr.dtype == np.object: + raise TypeError( + "\n\tFaild to convert input data to a regular ndarray :\n\t* Usually " + "this means the input data contains nested lists with different lengths. " + "\n\t* Check the reader function passed to 'decorate_batch_generator'" + " to locate the data causes this issue.\n\t* Please consider using " + "'fluid.create_lod_tensor' to convert it to a LoD-Tensor.") + + def _set_child_signal_handler(self): + core._set_process_pid(id(self), self._process.pid) + current_handler = signal.getsignal(signal.SIGCHLD) + if not callable(current_handler): + current_handler = None + + def __handler__(signum, frame): + core._throw_error_if_process_failed() + if current_handler is not None: + current_handler(signum, frame) + + signal.signal(signal.SIGCHLD, __handler__) + + def _reader_process_loop(self): + try: + # set signal handler + core._set_process_signal_handler() + + for sample in self._batch_reader(): + if sample is None: + raise ValueError( + "Sample in reader is None. Please check whether your dataset is valid." + ) + self._data_queue.put(sample) + self._data_queue.put(None) + except KeyboardInterrupt: + # NOTE: Main process will raise KeyboardInterrupt anyways, ignore it in child process + pass + except: + self._data_queue.cancel_join_thread() + self._data_queue.close() + six.reraise(*sys.exc_info()) + + def _reader_thread_loop_with_process(self): + while not self._thread_done_event.is_set(): + try: + # NOTE: [ avoid hanging ] Even with carefully designed data dependencies + # (i.e., a put() always corresponding to a get()), hanging on get() can + # still happen when data in queue is corrupted (e.g., due to + # Queue.cancel_join_thread or unexpected exit). So we set a timeout whenever + # we try to get data from `data_queue` + sample = self._data_queue.get(timeout=MP_CHECK_TIMEOUT) + except queue.Empty: + self._thread_done_event.set() + logging.error("The reader has not read data for a long time.") + + if not self._thread_done_event.is_set(): + if sample is not None: + try: + array = core.LoDTensorArray() + for item in sample: + if not isinstance(item, core.LoDTensor): + self._check_input_array(item) + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + array.append(item) + if not self._blocking_queue.push(array): + self._blocking_queue.close() + except: + self._thread_done_event.set() + self._blocking_queue.kill() + self._data_queue.close() + logging.warning( + "DygraphDataLoader reader thread raised an exception." + ) + six.reraise(*sys.exc_info()) + else: + self._thread_done_event.set() + self._blocking_queue.close() + self._data_queue.close() + else: + self._blocking_queue.kill() + self._data_queue.close() + + def _reader_thread_loop(self): + try: + for sample in self._batch_reader(): + array = core.LoDTensorArray() + for item in sample: + if not isinstance(item, core.LoDTensor): + self._check_input_array(item) + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + if not self._blocking_queue.push(array): + break + + self._blocking_queue.close() + self._thread = None + except Exception: + self._blocking_queue.kill() + self._thread = None + logging.warning( + "DygraphDataLoader reader thread raised an exception.") + six.reraise(*sys.exc_info()) + + def set_sample_generator(self, + reader, + batch_size, + drop_last=True, + places=None): + assert batch_size > 0, "batch_size must be larger than 0" + self.set_sample_list_generator( + paddle.batch( + reader, batch_size=batch_size, drop_last=drop_last), + places=places) + return self + + def set_sample_list_generator(self, reader, places=None): + def __batch_reader_impl__(): + for batch in reader(): + slots = [] + for items in batch: + for i, item in enumerate(items): + if len(slots) < len(items): + slots.append([item]) + else: + slots[i].append(item) + yield slots + + self.set_batch_generator(__batch_reader_impl__, places) + return self + + def set_batch_generator(self, reader, places=None): + self._batch_reader = reader + assert places is not None, "Places cannot be None when DataLoader is iterable" + self._places = _convert_places(places) + assert len(self._places) == 1, \ + "Number of places must be 1 in dygraph mode" + return self + + class GeneratorLoader(DataLoaderBase): def __init__(self, feed_list=None, @@ -305,26 +618,14 @@ class GeneratorLoader(DataLoaderBase): self._tensor_reader = None self._places = None self._thread = None + self._queue = None self._feed_list = feed_list if not capacity: raise ValueError("Please give value to capacity.") - # force to use iterable mode under dygraph mode - if in_dygraph_mode(): - if not iterable: - warnings.warn( - "Please NOTE: dygraph can support iterable mode only. Change to iterable mode." - ) - self._iterable = True - if not return_list: - warnings.warn( - "Please NOTE: dygraph can support return as list only. Change to return as list." - ) - self._return_list = True - else: - self._iterable = iterable - self._return_list = return_list - if not self._feed_list: - raise Exception("Feed list must be given under static mode.") + self._iterable = iterable + self._return_list = return_list + if not self._feed_list: + raise Exception("Feed list must be given under static mode.") self._use_double_buffer = use_double_buffer self._capacity = capacity if not self._iterable: @@ -340,18 +641,12 @@ class GeneratorLoader(DataLoaderBase): def _init_iterable(self): self._wait_thread_ends() - if in_dygraph_mode(): - self._var_names = [] - self._shapes = [] - self._dtypes = [] - self._need_check_feed = [] - else: - self._var_names = [v.name for v in self._feed_list] - self._shapes = [v.shape for v in self._feed_list] - self._dtypes = [v.dtype for v in self._feed_list] - self._need_check_feed = [ - v.desc.need_check_feed() for v in self._feed_list - ] + self._var_names = [v.name for v in self._feed_list] + self._shapes = [v.shape for v in self._feed_list] + self._dtypes = [v.dtype for v in self._feed_list] + self._need_check_feed = [ + v.desc.need_check_feed() for v in self._feed_list + ] self._queue = core.init_lod_tensor_blocking_queue(core.Variable(), self._capacity) self._reader = core.create_py_reader( @@ -442,27 +737,22 @@ class GeneratorLoader(DataLoaderBase): def __next__(self): try: - if in_dygraph_mode(): - return self._reader.read_next_var_list() + if self._return_list: + return self._reader.read_next_list() else: - if self._return_list: - return self._reader.read_next_list() - else: - return self._reader.read_next() + return self._reader.read_next() except StopIteration: self._queue.close() self._reset() six.reraise(*sys.exc_info()) def start(self): - if not in_dygraph_mode(): - assert not self._iterable, "start() cannot be called when DataLoader is iterable" - self._start() + assert not self._iterable, "start() cannot be called when DataLoader is iterable" + self._start() def reset(self): - if not in_dygraph_mode(): - assert not self._iterable, "reset() cannot be called when DataLoader is iterable" - self._reset() + assert not self._iterable, "reset() cannot be called when DataLoader is iterable" + self._reset() @classmethod def _check_input_array(cls, item): @@ -518,56 +808,36 @@ class GeneratorLoader(DataLoaderBase): drop_last=True, places=None): assert batch_size > 0, "batch_size must be larger than 0" - if in_dygraph_mode(): + has_lod = False + for f in self._feed_list: + if f.lod_level != 0: + has_lod = True + break + + if has_lod: self.set_sample_list_generator( paddle.batch( reader, batch_size=batch_size, drop_last=drop_last), places=places) else: - has_lod = False - for f in self._feed_list: - if f.lod_level != 0: - has_lod = True - break - - if has_lod: - self.set_sample_list_generator( - paddle.batch( - reader, batch_size=batch_size, drop_last=drop_last), - places=places) - else: - reader = BatchedTensorProvider( - feed_list=self._feed_list, - place=core.CPUPlace(), - batch_size=batch_size, - generator=reader, - drop_last=drop_last) - self.set_batch_generator(reader, places=places) + reader = BatchedTensorProvider( + feed_list=self._feed_list, + place=core.CPUPlace(), + batch_size=batch_size, + generator=reader, + drop_last=drop_last) + self.set_batch_generator(reader, places=places) return self def set_sample_list_generator(self, reader, places=None): - if in_dygraph_mode(): - - def __tensor_reader_impl__(): - for batch in reader(): - slots = [] - for items in batch: - for i, item in enumerate(items): - if len(slots) < len(items): - slots.append([item]) - else: - slots[i].append(item) - yield slots - else: - with program_guard(Program(), Program()): - feeder = DataFeeder( - feed_list=self._feed_list, place=core.CPUPlace()) - paddle_reader = feeder.decorate_reader( - reader, multi_devices=False) + with program_guard(Program(), Program()): + feeder = DataFeeder( + feed_list=self._feed_list, place=core.CPUPlace()) + paddle_reader = feeder.decorate_reader(reader, multi_devices=False) - def __tensor_reader_impl__(): - for slots in paddle_reader(): - yield [slots[var.name] for var in self._feed_list] + def __tensor_reader_impl__(): + for slots in paddle_reader(): + yield [slots[var.name] for var in self._feed_list] self.set_batch_generator(__tensor_reader_impl__, places) return self @@ -577,9 +847,6 @@ class GeneratorLoader(DataLoaderBase): if self._iterable: assert places is not None, "Places cannot be None when DataLoader is iterable" self._places = _convert_places(places) - if in_dygraph_mode(): - assert len(self._places) == 1, \ - "Number of places must be 1 in dygraph mode" else: if places is not None: logging.info( diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index aad7649777f13ea0a2767758fb0ffbb5ca26ac63..0746590fcefc6aeaea7bfcace26f7a660909620b 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -187,6 +187,9 @@ list(REMOVE_ITEM TEST_OPS test_fuse_bn_act_pass) if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_dataset) list(REMOVE_ITEM TEST_OPS test_dataset_dataloader) + list(REMOVE_ITEM TEST_OPS test_imperative_data_loader) + list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_process) + list(REMOVE_ITEM TEST_OPS test_imperative_signal_handler) endif() # Some ops need to check results when gc is enabled diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..2848bff04b9d142c917490a8ad3a76e433118605 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader.py @@ -0,0 +1,186 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +import numpy as np +import paddle.fluid as fluid +from paddle.fluid import core +import paddle.compat as cpt + + +def get_random_images_and_labels(image_shape, label_shape): + image = np.random.random(size=image_shape).astype('float32') + label = np.random.random(size=label_shape).astype('int64') + return image, label + + +def sample_generator_creator(batch_size, batch_num): + def __reader__(): + for _ in range(batch_num * batch_size): + image, label = get_random_images_and_labels([784], [1]) + yield image, label + + return __reader__ + + +def sample_list_generator_creator(batch_size, batch_num): + def __reader__(): + for _ in range(batch_num): + sample_list = [] + for _ in range(batch_size): + image, label = get_random_images_and_labels([784], [1]) + sample_list.append([image, label]) + + yield sample_list + + return __reader__ + + +def batch_generator_creator(batch_size, batch_num): + def __reader__(): + for _ in range(batch_num): + batch_image, batch_label = get_random_images_and_labels( + [batch_size, 784], [batch_size, 1]) + yield batch_image, batch_label + + return __reader__ + + +class TestDygraphhDataLoader(unittest.TestCase): + def setUp(self): + self.batch_size = 8 + self.batch_num = 4 + self.epoch_num = 2 + self.capacity = 2 + + def test_single_process_reader(self): + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, iterable=False, use_multiprocess=False) + loader.set_sample_generator( + sample_generator_creator(self.batch_size, self.batch_num), + batch_size=self.batch_size, + places=fluid.CPUPlace()) + for _ in range(self.epoch_num): + for image, label in loader(): + relu = fluid.layers.relu(image) + self.assertEqual(image.shape, [self.batch_size, 784]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, 784]) + + def test_sample_genarator(self): + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, use_multiprocess=True) + loader.set_sample_generator( + sample_generator_creator(self.batch_size, self.batch_num), + batch_size=self.batch_size, + places=fluid.CPUPlace()) + for _ in range(self.epoch_num): + for image, label in loader(): + relu = fluid.layers.relu(image) + self.assertEqual(image.shape, [self.batch_size, 784]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, 784]) + + def test_sample_list_generator(self): + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, use_multiprocess=True) + loader.set_sample_list_generator( + sample_list_generator_creator(self.batch_size, self.batch_num), + places=fluid.CPUPlace()) + for _ in range(self.epoch_num): + for image, label in loader(): + relu = fluid.layers.relu(image) + self.assertEqual(image.shape, [self.batch_size, 784]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, 784]) + + def test_batch_genarator(self): + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, use_multiprocess=True) + loader.set_batch_generator( + batch_generator_creator(self.batch_size, self.batch_num), + places=fluid.CPUPlace()) + for _ in range(self.epoch_num): + for image, label in loader(): + relu = fluid.layers.relu(image) + self.assertEqual(image.shape, [self.batch_size, 784]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, 784]) + + +class TestDygraphhDataLoaderWithException(unittest.TestCase): + def setUp(self): + self.batch_num = 4 + self.capacity = 2 + + def test_not_capacity(self): + with fluid.dygraph.guard(): + with self.assertRaisesRegexp(ValueError, + "Please give value to capacity."): + fluid.io.DataLoader.from_generator() + + def test_single_process_with_thread_expection(self): + def error_sample_genarator(batch_num): + def __reader__(): + for _ in range(batch_num): + yield [[[1, 2], [1]]] + + return __reader__ + + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, iterable=False, use_multiprocess=False) + loader.set_batch_generator( + error_sample_genarator(self.batch_num), places=fluid.CPUPlace()) + exception = None + try: + for _ in loader(): + print("test_single_process_with_thread_expection") + except core.EnforceNotMet as ex: + self.assertIn("Blocking queue is killed", + cpt.get_exception_message(ex)) + exception = ex + self.assertIsNotNone(exception) + + def test_multi_process_with_thread_expection(self): + def error_sample_genarator(batch_num): + def __reader__(): + for _ in range(batch_num): + yield [[[1, 2], [1]]] + + return __reader__ + + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.capacity, use_multiprocess=True) + loader.set_batch_generator( + error_sample_genarator(self.batch_num), places=fluid.CPUPlace()) + exception = None + try: + for _ in loader(): + print("test_multi_process_with_thread_expection") + except core.EnforceNotMet as ex: + self.assertIn("Blocking queue is killed", + cpt.get_exception_message(ex)) + exception = ex + self.assertIsNotNone(exception) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py new file mode 100644 index 0000000000000000000000000000000000000000..8ca146074191b5e66525a2e7be9d59f10a99dc3d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_imperative_data_loader_process.py @@ -0,0 +1,86 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +import numpy as np +import paddle.fluid as fluid + +if sys.version_info[0] == 2: + import Queue as queue +else: + import queue + + +def get_random_images_and_labels(image_shape, label_shape): + image = np.random.random(size=image_shape).astype('float32') + label = np.random.random(size=label_shape).astype('int64') + return image, label + + +def batch_generator_creator(batch_size, batch_num): + def __reader__(): + for _ in range(batch_num): + batch_image, batch_label = get_random_images_and_labels( + [batch_size, 784], [batch_size, 1]) + yield batch_image, batch_label + + return __reader__ + + +# NOTE: coverage CI can't cover child process code, so need these test. +# Here test child process loop function in main process +class TestDygraphhDataLoaderProcess(unittest.TestCase): + def setUp(self): + self.batch_size = 8 + self.batch_num = 4 + self.epoch_num = 2 + self.capacity = 2 + + def test_reader_process_loop(self): + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.batch_num + 1, use_multiprocess=True) + loader.set_batch_generator( + batch_generator_creator(self.batch_size, self.batch_num), + places=fluid.CPUPlace()) + loader._data_queue = queue.Queue(self.batch_num + 1) + loader._reader_process_loop() + for _ in range(self.batch_num): + loader._data_queue.get(timeout=10) + + def test_reader_process_loop_simple_none(self): + def none_sample_genarator(batch_num): + def __reader__(): + for _ in range(batch_num): + yield None + + return __reader__ + + with fluid.dygraph.guard(): + loader = fluid.io.DataLoader.from_generator( + capacity=self.batch_num + 1, use_multiprocess=True) + loader.set_batch_generator( + none_sample_genarator(self.batch_num), places=fluid.CPUPlace()) + loader._data_queue = queue.Queue(self.batch_num + 1) + exception = None + try: + loader._reader_process_loop() + except AttributeError as ex: + exception = ex + self.assertIsNotNone(exception) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_imperative_resnet.py b/python/paddle/fluid/tests/unittests/test_imperative_resnet.py index 9f609355b13186a0aea3a1b2196452148eda1c4d..106f58ccc99ffe42b77466e6dbf7b773ecee4ee2 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_resnet.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_resnet.py @@ -242,8 +242,6 @@ class TestDygraphResnet(unittest.TestCase): optimizer = optimizer_setting( train_parameters, parameter_list=resnet.parameters()) np.random.seed(seed) - import random - random.seed = seed batch_py_reader = fluid.io.PyReader(capacity=1) batch_py_reader.decorate_sample_list_generator( @@ -330,8 +328,6 @@ class TestDygraphResnet(unittest.TestCase): optimizer = optimizer_setting(train_parameters) np.random.seed(seed) - import random - random.seed = seed train_reader = paddle.batch( paddle.dataset.flowers.train(use_xmap=False), batch_size=batch_size) diff --git a/python/paddle/fluid/tests/unittests/test_imperative_se_resnext.py b/python/paddle/fluid/tests/unittests/test_imperative_se_resnext.py index e0cc89962d293ab0f38cdd5b691a7f04b5a025a0..600ab15629d7a1c3ebecd638c997090b7e87c73f 100644 --- a/python/paddle/fluid/tests/unittests/test_imperative_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_imperative_se_resnext.py @@ -316,8 +316,6 @@ class TestImperativeResneXt(unittest.TestCase): optimizer = optimizer_setting( train_parameters, parameter_list=se_resnext.parameters()) np.random.seed(seed) - import random - random.seed = seed batch_py_reader = fluid.io.PyReader(capacity=1) batch_py_reader.decorate_sample_list_generator( @@ -379,8 +377,6 @@ class TestImperativeResneXt(unittest.TestCase): optimizer = optimizer_setting(train_parameters) np.random.seed(seed) - import random - random.seed = seed train_reader = paddle.batch( paddle.dataset.flowers.train(use_xmap=False), batch_size=batch_size, diff --git a/python/paddle/fluid/tests/unittests/test_imperative_signal_handler.py b/python/paddle/fluid/tests/unittests/test_imperative_signal_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..0f3033fe3b1c58a02fe0802e296c26bda6bd20f4 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_imperative_signal_handler.py @@ -0,0 +1,91 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import signal +import unittest +import multiprocessing +import time + +import paddle.compat as cpt +from paddle.fluid import core + + +def set_child_signal_handler(self, child_pid): + core._set_process_pid(id(self), child_pid) + current_handler = signal.getsignal(signal.SIGCHLD) + if not callable(current_handler): + current_handler = None + + def __handler__(signum, frame): + core._throw_error_if_process_failed() + if current_handler is not None: + current_handler(signum, frame) + + signal.signal(signal.SIGCHLD, __handler__) + + +class TestDygraphDataLoaderSingalHandler(unittest.TestCase): + def test_child_process_exit_will_error(self): + def __test_process__(): + core._set_process_signal_handler() + sys.exit(1) + + exception = None + try: + test_process = multiprocessing.Process(target=__test_process__) + test_process.start() + + set_child_signal_handler(id(self), test_process.pid) + time.sleep(1) + except core.EnforceNotMet as ex: + self.assertIn("FatalError", cpt.get_exception_message(ex)) + exception = ex + + self.assertIsNotNone(exception) + + def test_child_process_killed_by_sigsegv(self): + def __test_process__(): + core._set_process_signal_handler() + os.kill(os.getpid(), signal.SIGSEGV) + + exception = None + try: + test_process = multiprocessing.Process(target=__test_process__) + test_process.start() + + set_child_signal_handler(id(self), test_process.pid) + time.sleep(1) + except core.EnforceNotMet as ex: + self.assertIn("FatalError", cpt.get_exception_message(ex)) + exception = ex + + self.assertIsNotNone(exception) + + def test_child_process_killed_by_sigterm(self): + def __test_process__(): + core._set_process_signal_handler() + time.sleep(10) + + test_process = multiprocessing.Process(target=__test_process__) + test_process.daemon = True + test_process.start() + + set_child_signal_handler(id(self), test_process.pid) + time.sleep(1) + + +if __name__ == '__main__': + unittest.main()