未验证 提交 a51c3272 编写于 作者: D Dun 提交者: GitHub

Merge pull request #15664 from cjld/my_checkpoint

Fix Pr #15296 async buffered reader
......@@ -257,7 +257,7 @@ void *Alloc<platform::CUDAPinnedPlace>(const platform::CUDAPinnedPlace &place,
void *ptr = buddy_allocator->Alloc(size);
if (ptr == nullptr) {
LOG(WARNING) << "cudaMallocHost Cannot allocate " << size
LOG(WARNING) << "cudaHostAlloc Cannot allocate " << size
<< " bytes in CUDAPinnedPlace";
}
if (FLAGS_init_allocated_mem) {
......
......@@ -32,7 +32,7 @@ Allocation *CPUPinnedAllocator::AllocateImpl(size_t size,
// "CPUPinnedAllocator should be used for Cross-Device Communication");
void *ptr;
PADDLE_ENFORCE(cudaMallocHost(&ptr, size));
PADDLE_ENFORCE(cudaHostAlloc(&ptr, size, cudaHostAllocPortable));
return new CPUPinnedAllocation(ptr, size);
}
} // namespace allocation
......
......@@ -19,7 +19,7 @@ namespace paddle {
namespace memory {
namespace allocation {
// Allocator uses `cudaMallocHost`
// Allocator uses `cudaHostAlloc`
class CPUPinnedAllocation : public Allocation {
public:
CPUPinnedAllocation(void *ptr, size_t size)
......
......@@ -173,14 +173,14 @@ void* CUDAPinnedAllocator::Alloc(size_t* index, size_t size) {
void* p;
// PINNED memory is visible to all CUDA contexts.
cudaError_t result = cudaMallocHost(&p, size);
cudaError_t result = cudaHostAlloc(&p, size, cudaHostAllocPortable);
if (result == cudaSuccess) {
*index = 1; // PINNED memory
cuda_pinnd_alloc_size_ += size;
return p;
} else {
LOG(WARNING) << "cudaMallocHost failed.";
LOG(WARNING) << "cudaHostAlloc failed.";
return nullptr;
}
......
......@@ -14,6 +14,7 @@
#include "paddle/fluid/operators/reader/buffered_reader.h"
#include <vector>
#include "paddle/fluid/framework/data_type.h"
namespace paddle {
namespace operators {
......@@ -24,6 +25,13 @@ BufferedReader::~BufferedReader() {
position_.front().wait();
position_.pop();
}
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamDestroy(stream));
for (auto &event : events) PADDLE_ENFORCE(cudaEventDestroy(event));
}
#endif
}
BufferedReader::BufferedReader(
......@@ -33,6 +41,19 @@ BufferedReader::BufferedReader(
thread_pool_(1),
place_(place),
buffer_size_(buffer_size) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
compute_stream =
((platform::CUDADeviceContext *)(platform::DeviceContextPool::Instance()
.Get(place_)))
->stream();
events.resize(buffer_size);
for (auto &event : events)
PADDLE_ENFORCE(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
PADDLE_ENFORCE(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
}
#endif
cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size);
ReadTillBufferFullAsync();
......@@ -46,6 +67,12 @@ void BufferedReader::ReadTillBufferFullAsync() {
}
void BufferedReader::ReadAsync(size_t i) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaEventRecord(events[i], compute_stream));
}
#endif
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
TensorVec &cpu = cpu_buffer_[i];
reader_->ReadNext(&cpu);
......@@ -54,14 +81,41 @@ void BufferedReader::ReadAsync(size_t i) {
return -1UL;
}
#ifdef PADDLE_WITH_CUDA
// NOTE(liangdun): using async copy instead of TensorCopySync
// TensorCopySync would block other stream
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamWaitEvent(stream, events[i], 0));
TensorVec &gpu = gpu_buffer_[i];
gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) {
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
gpu[i].Resize(cpu[i].dims());
gpu[i].set_layout(cpu[i].layout());
auto cpu_place = cpu[i].place();
auto cpu_ptr = cpu[i].data<void>();
auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type());
auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());
if (platform::is_cuda_pinned_place(cpu_place))
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPinnedPlace>(cpu_place),
cpu_ptr, size, stream);
else if ((platform::is_gpu_place(cpu_place)))
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPlace>(cpu_place), cpu_ptr,
size, stream);
else
// if cpu place is not pinned, async copy is slower than sync copy,
// so we use sync copy instead.
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CPUPlace>(cpu_place), cpu_ptr, size,
0);
gpu[i].set_lod(cpu[i].lod());
}
PADDLE_ENFORCE(cudaStreamSynchronize(stream));
}
#endif
return i;
}));
}
......
......@@ -19,6 +19,9 @@
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/gpu_info.h"
#endif
namespace paddle {
namespace operators {
......@@ -59,6 +62,11 @@ class BufferedReader : public framework::DecoratedReader {
std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL};
#ifdef PADDLE_WITH_CUDA
cudaStream_t stream;
cudaStream_t compute_stream;
std::vector<cudaEvent_t> events;
#endif
};
} // namespace reader
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册