diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index 26ff221dfa0768bd2bcc9e6485a32485f0212ac6..d5a7c50d95cb82009e8055f0279d858f690059a5 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/operators/reader/buffered_reader.h" #include +#include "paddle/fluid/framework/data_type.h" namespace paddle { namespace operators { @@ -24,6 +25,12 @@ BufferedReader::~BufferedReader() { position_.front().wait(); position_.pop(); } +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place_)) { + platform::SetDeviceId(boost::get(place_).device); + PADDLE_ENFORCE(cudaStreamDestroy(stream)); + } +#endif } BufferedReader::BufferedReader( @@ -33,6 +40,12 @@ BufferedReader::BufferedReader( thread_pool_(1), place_(place), buffer_size_(buffer_size) { +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place_)) { + platform::SetDeviceId(boost::get(place_).device); + PADDLE_ENFORCE(cudaStreamCreate(&stream)); + } +#endif cpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size); ReadTillBufferFullAsync(); @@ -54,14 +67,35 @@ void BufferedReader::ReadAsync(size_t i) { return -1UL; } +#ifdef PADDLE_WITH_CUDA + // NOTE(liangdun): using async copy instead of TensorCopySync + // TensorCopySync would block other stream if (platform::is_gpu_place(place_)) { TensorVec &gpu = gpu_buffer_[i]; gpu.resize(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { - framework::TensorCopySync(cpu[i], place_, &gpu[i]); + gpu[i].Resize(cpu[i].dims()); + gpu[i].set_layout(cpu[i].layout()); + auto cpu_place = cpu[i].place(); + auto cpu_ptr = cpu[i].data(); + auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type()); + auto size = + cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); + if (platform::is_cuda_pinned_place(cpu_place)) + memory::Copy(boost::get(place_), gpu_ptr, + boost::get(cpu_place), + cpu_ptr, size, stream); + else + // if cpu place is not pinned, async copy is slower than sync copy, + // so we use sync copy instead. + memory::Copy(boost::get(place_), gpu_ptr, + boost::get(cpu_place), cpu_ptr, size, + 0); gpu[i].set_lod(cpu[i].lod()); } + PADDLE_ENFORCE(cudaStreamSynchronize(stream)); } +#endif return i; })); } diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index cbe2bc1b5fdd69d1a843b768e3289acd621369a6..e55572177ccd7cd18695bdecc4c65a25ffd6b5d4 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -19,6 +19,9 @@ #include #include "ThreadPool.h" #include "paddle/fluid/framework/reader.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/gpu_info.h" +#endif namespace paddle { namespace operators { @@ -59,6 +62,9 @@ class BufferedReader : public framework::DecoratedReader { std::vector cpu_buffer_; std::vector gpu_buffer_; size_t prev_pos_{-1UL}; +#ifdef PADDLE_WITH_CUDA + cudaStream_t stream; +#endif }; } // namespace reader diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 9a29b2509357c93a684d736cf0d2523970fb5ff1..a5f91aad79bf45df7de2932a2d19421cba91d0f4 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -483,6 +483,7 @@ def _py_reader(capacity, lod_levels=None, name=None, use_double_buffer=True, + use_cuda_pinned_place=False, feed_list=None): if feed_list is not None: @@ -565,7 +566,10 @@ def _py_reader(capacity, for item in tensors: if not isinstance(item, core.LoDTensor): tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) + if use_cuda_pinned_place: + tmp.set(item, core.CUDAPinnedPlace()) + else: + tmp.set(item, core.CPUPlace()) item = tmp array.append(item) @@ -635,7 +639,8 @@ def py_reader(capacity, dtypes, lod_levels=None, name=None, - use_double_buffer=True): + use_double_buffer=True, + use_cuda_pinned_place=None): """ Create a Python reader for data feeding in Python @@ -659,6 +664,9 @@ def py_reader(capacity, name(basestring): The prefix Python queue name and Reader name. None will be generated automatically. use_double_buffer(bool): Whether use double buffer or not. + use_cuda_pinned_place(bool): Whether use cuda pinned place or not, + this option only works with double buffer and cuda enabled. + None will be enabled when double buffer and cuda are enabled. Returns: Variable: A Reader from which we can get feeding data. @@ -754,13 +762,22 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ + if use_double_buffer and core.is_compiled_with_cuda(): + if use_cuda_pinned_place == None: + use_cuda_pinned_place = True + else: + if use_cuda_pinned_place: + raise RuntimeError( + "use_cuda_pinned_place can only be used with double buffer and cuda enabled." + ) return _py_reader( capacity=capacity, shapes=shapes, dtypes=dtypes, lod_levels=lod_levels, name=name, - use_double_buffer=use_double_buffer) + use_double_buffer=use_double_buffer, + use_cuda_pinned_place=use_cuda_pinned_place) def create_py_reader_by_data(capacity,