From 08773b6069d936c5f53935718dc6659bf8debf1c Mon Sep 17 00:00:00 2001 From: Zeng Jinle <32832641+sneaxiy@users.noreply.github.com> Date: Mon, 29 Apr 2019 21:25:15 -0500 Subject: [PATCH] fix reader default stream,test=develop (#17106) --- .../memory/allocation/retry_allocator_test.cc | 2 +- .../fluid/operators/reader/buffered_reader.cc | 38 +++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/memory/allocation/retry_allocator_test.cc b/paddle/fluid/memory/allocation/retry_allocator_test.cc index f0b215dac25..345b5f44d3d 100644 --- a/paddle/fluid/memory/allocation/retry_allocator_test.cc +++ b/paddle/fluid/memory/allocation/retry_allocator_test.cc @@ -39,7 +39,7 @@ TEST(RetryAllocator, RetryAllocator) { std::unique_ptr locked_allocator( new LockedAllocator(std::move(best_fit_allocator))); - size_t thread_num = 32; + size_t thread_num = 8; size_t sleep_time = 40; size_t extra_time = 10; diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index 5d93d2e32ef..418c342c8fc 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -74,12 +74,6 @@ void BufferedReader::ReadTillBufferFullAsync() { } void BufferedReader::ReadAsync(size_t i) { -#ifdef PADDLE_WITH_CUDA - if (platform::is_gpu_place(place_)) { - platform::SetDeviceId(boost::get(place_).device); - PADDLE_ENFORCE(cudaEventRecord(events_[i], compute_stream_)); - } -#endif position_.emplace(thread_pool_.enqueue([this, i]() -> size_t { TensorVec &cpu = cpu_buffer_[i]; reader_->ReadNext(&cpu); @@ -94,17 +88,34 @@ void BufferedReader::ReadAsync(size_t i) { // issues the copying command to the default stream, it will make two // commands from different streams cannot run concurrently. if (platform::is_gpu_place(place_)) { - platform::SetDeviceId(boost::get(place_).device); - PADDLE_ENFORCE(cudaStreamWaitEvent(stream_, events_[i], 0)); TensorVec &gpu = gpu_buffer_[i]; - gpu.resize(cpu.size()); - platform::RecordEvent record_event("BufferedReader:MemoryCopy"); + if (gpu.empty()) { + gpu.resize(cpu.size()); + } else { + PADDLE_ENFORCE_EQ(gpu.size(), cpu.size(), + "Input tensor number not matched"); + } + + std::vector gpu_ptrs; + gpu_ptrs.reserve(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { gpu[i].Resize(cpu[i].dims()); gpu[i].set_layout(cpu[i].layout()); + gpu_ptrs.emplace_back(gpu[i].mutable_data(place_, cpu[i].type())); + } + + // NOTE(zjl): cudaStreamWaitEvent() must be called after all + // gpu[i].mutable_data() is called, since some ops release + // gpu memory immediately without waiting gpu kernel ends + platform::SetDeviceId(boost::get(place_).device); + PADDLE_ENFORCE(cudaEventRecord(events_[i], compute_stream_)); + PADDLE_ENFORCE(cudaStreamWaitEvent(stream_, events_[i], 0)); + + platform::RecordEvent record_event("BufferedReader:MemoryCopy"); + for (size_t i = 0; i < cpu.size(); ++i) { auto cpu_place = cpu[i].place(); auto cpu_ptr = cpu[i].data(); - auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type()); + auto gpu_ptr = gpu_ptrs[i]; auto size = cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); if (platform::is_cuda_pinned_place(cpu_place)) { @@ -116,12 +127,9 @@ void BufferedReader::ReadAsync(size_t i) { boost::get(cpu_place), cpu_ptr, size, stream_); } else { - // if cpu place is not pinned, async copy is slower than sync copy, - // so we use sync copy instead. - // TODO(zcd): The default stream should not be used here. memory::Copy(boost::get(place_), gpu_ptr, boost::get(cpu_place), cpu_ptr, size, - 0); + stream_); } gpu[i].set_lod(cpu[i].lod()); } -- GitLab