未验证 提交 db8fcf6b 编写于 作者: D Dun 提交者: GitHub

Merge pull request #15296 from cjld/async_double_buffered_py_reader

Async double buffered py reader
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "paddle/fluid/operators/reader/buffered_reader.h" #include "paddle/fluid/operators/reader/buffered_reader.h"
#include <vector> #include <vector>
#include "paddle/fluid/framework/data_type.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -24,6 +25,12 @@ BufferedReader::~BufferedReader() { ...@@ -24,6 +25,12 @@ BufferedReader::~BufferedReader() {
position_.front().wait(); position_.front().wait();
position_.pop(); position_.pop();
} }
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamDestroy(stream));
}
#endif
} }
BufferedReader::BufferedReader( BufferedReader::BufferedReader(
...@@ -33,6 +40,12 @@ BufferedReader::BufferedReader( ...@@ -33,6 +40,12 @@ BufferedReader::BufferedReader(
thread_pool_(1), thread_pool_(1),
place_(place), place_(place),
buffer_size_(buffer_size) { buffer_size_(buffer_size) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamCreate(&stream));
}
#endif
cpu_buffer_.resize(buffer_size); cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size);
ReadTillBufferFullAsync(); ReadTillBufferFullAsync();
...@@ -54,14 +67,39 @@ void BufferedReader::ReadAsync(size_t i) { ...@@ -54,14 +67,39 @@ void BufferedReader::ReadAsync(size_t i) {
return -1UL; return -1UL;
} }
#ifdef PADDLE_WITH_CUDA
// NOTE(liangdun): using async copy instead of TensorCopySync
// TensorCopySync would block other stream
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
TensorVec &gpu = gpu_buffer_[i]; TensorVec &gpu = gpu_buffer_[i];
gpu.resize(cpu.size()); gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) { for (size_t i = 0; i < cpu.size(); ++i) {
framework::TensorCopySync(cpu[i], place_, &gpu[i]); gpu[i].Resize(cpu[i].dims());
gpu[i].set_layout(cpu[i].layout());
auto cpu_place = cpu[i].place();
auto cpu_ptr = cpu[i].data<void>();
auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type());
auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());
if (platform::is_cuda_pinned_place(cpu_place))
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPinnedPlace>(cpu_place),
cpu_ptr, size, stream);
else if ((platform::is_gpu_place(cpu_place)))
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPlace>(cpu_place), cpu_ptr,
size, stream);
else
// if cpu place is not pinned, async copy is slower than sync copy,
// so we use sync copy instead.
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CPUPlace>(cpu_place), cpu_ptr, size,
0);
gpu[i].set_lod(cpu[i].lod()); gpu[i].set_lod(cpu[i].lod());
} }
PADDLE_ENFORCE(cudaStreamSynchronize(stream));
} }
#endif
return i; return i;
})); }));
} }
......
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include <vector> #include <vector>
#include "ThreadPool.h" #include "ThreadPool.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/gpu_info.h"
#endif
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -59,6 +62,9 @@ class BufferedReader : public framework::DecoratedReader { ...@@ -59,6 +62,9 @@ class BufferedReader : public framework::DecoratedReader {
std::vector<TensorVec> cpu_buffer_; std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_; std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL}; size_t prev_pos_{-1UL};
#ifdef PADDLE_WITH_CUDA
cudaStream_t stream;
#endif
}; };
} // namespace reader } // namespace reader
......
...@@ -484,7 +484,7 @@ def _py_reader(capacity, ...@@ -484,7 +484,7 @@ def _py_reader(capacity,
name=None, name=None,
use_double_buffer=True, use_double_buffer=True,
feed_list=None): feed_list=None):
use_cuda_pinned_place = use_double_buffer and core.is_compiled_with_cuda()
if feed_list is not None: if feed_list is not None:
if not isinstance(feed_list, list): if not isinstance(feed_list, list):
raise TypeError("feed_list should be a list of Variable" raise TypeError("feed_list should be a list of Variable"
...@@ -565,6 +565,9 @@ def _py_reader(capacity, ...@@ -565,6 +565,9 @@ def _py_reader(capacity,
for item in tensors: for item in tensors:
if not isinstance(item, core.LoDTensor): if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor() tmp = core.LoDTensor()
if use_cuda_pinned_place:
tmp.set(item, core.CUDAPinnedPlace())
else:
tmp.set(item, core.CPUPlace()) tmp.set(item, core.CPUPlace())
item = tmp item = tmp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册