提交 a900015c 编写于 作者: D Dun Liang

add async copy and pinned place

上级 adc96e06
......@@ -14,6 +14,7 @@
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include <vector>
#include "paddle/fluid/framework/data_type.h"
namespace paddle {
namespace operators {
......@@ -24,6 +25,12 @@ BufferedReader::~BufferedReader() {
position_.front().wait();
position_.pop();
}
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamDestroy(stream));
}
#endif
}
BufferedReader::BufferedReader(
......@@ -33,6 +40,12 @@ BufferedReader::BufferedReader(
thread_pool_(1),
place_(place),
buffer_size_(buffer_size) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamCreate(&stream));
}
#endif
cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size);
ReadTillBufferFullAsync();
......@@ -54,14 +67,35 @@ void BufferedReader::ReadAsync(size_t i) {
return -1UL;
}
#ifdef PADDLE_WITH_CUDA
// NOTE(liangdun): using async copy instead of TensorCopySync
// TensorCopySync would block other stream
if (platform::is_gpu_place(place_)) {
TensorVec &gpu = gpu_buffer_[i];
gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) {
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
gpu[i].Resize(cpu[i].dims());
gpu[i].set_layout(cpu[i].layout());
auto cpu_place = cpu[i].place();
auto cpu_ptr = cpu[i].data<void>();
auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type());
auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());
if (platform::is_cuda_pinned_place(cpu_place))
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPinnedPlace>(cpu_place),
cpu_ptr, size, stream);
else
// if cpu place is not pinned, async copy is slower than sync copy,
// so we use sync copy instead.
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CPUPlace>(cpu_place), cpu_ptr, size,
0);
gpu[i].set_lod(cpu[i].lod());
}
PADDLE_ENFORCE(cudaStreamSynchronize(stream));
}
#endif
return i;
}));
}
......
......@@ -19,6 +19,9 @@
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/gpu_info.h"
#endif
namespace paddle {
namespace operators {
......@@ -59,6 +62,9 @@ class BufferedReader : public framework::DecoratedReader {
std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL};
#ifdef PADDLE_WITH_CUDA
cudaStream_t stream;
#endif
};
} // namespace reader
......
......@@ -483,6 +483,7 @@ def _py_reader(capacity,
lod_levels=None,
name=None,
use_double_buffer=True,
use_cuda_pinned_place=False,
feed_list=None):
if feed_list is not None:
......@@ -565,7 +566,10 @@ def _py_reader(capacity,
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
if use_cuda_pinned_place:
tmp.set(item, core.CUDAPinnedPlace())
else:
tmp.set(item, core.CPUPlace())
item = tmp
array.append(item)
......@@ -635,7 +639,8 @@ def py_reader(capacity,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True):
use_double_buffer=True,
use_cuda_pinned_place=None):
"""
Create a Python reader for data feeding in Python
......@@ -659,6 +664,9 @@ def py_reader(capacity,
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
use_double_buffer(bool): Whether use double buffer or not.
use_cuda_pinned_place(bool): Whether use cuda pinned place or not,
this option only works with double buffer and cuda enabled.
None will be enabled when double buffer and cuda are enabled.
Returns:
Variable: A Reader from which we can get feeding data.
......@@ -754,13 +762,22 @@ def py_reader(capacity,
>>> except fluid.core.EOFException:
>>> test_reader.reset()
"""
if use_double_buffer and core.is_compiled_with_cuda():
if use_cuda_pinned_place == None:
use_cuda_pinned_place = True
else:
if use_cuda_pinned_place:
raise RuntimeError(
"use_cuda_pinned_place can only be used with double buffer and cuda enabled."
)
return _py_reader(
capacity=capacity,
shapes=shapes,
dtypes=dtypes,
lod_levels=lod_levels,
name=name,
use_double_buffer=use_double_buffer)
use_double_buffer=use_double_buffer,
use_cuda_pinned_place=use_cuda_pinned_place)
def create_py_reader_by_data(capacity,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册