From 1b3081b1b400992448c38c35ad099c87c2dc20fb Mon Sep 17 00:00:00 2001 From: Chen Weihang Date: Wed, 29 Jul 2020 11:53:34 +0800 Subject: [PATCH] Simplify BufferedReader to improve DataLoader performance (#25648) * simplify buffered reader to improve DataLoader performance * fix 22 failed unittests * fix cuda pinned context condition * fix test_reader_reset failed * fix two failed unittests * change unittest place * polish error messaage * polish cast op GetExpecctedKernelType * remove debug info in unittest --- .../fluid/framework/data_device_transform.cc | 6 +- paddle/fluid/framework/tensor_util.cc | 36 +++++- paddle/fluid/operators/cast_op.cc | 13 ++- .../fluid/operators/reader/buffered_reader.cc | 105 ++++++------------ .../fluid/operators/reader/buffered_reader.h | 8 +- paddle/fluid/platform/init.cc | 4 +- paddle/fluid/platform/init_test.cc | 2 +- .../unittests/test_decoupled_py_reader.py | 10 +- .../unittests/test_generator_dataloader.py | 11 +- .../test_multiprocess_dataloader_exception.py | 2 +- .../test_multiprocess_dataloader_static.py | 10 +- .../test_multiprocess_reader_exception.py | 2 +- .../tests/unittests/test_reader_reset.py | 3 +- 13 files changed, 118 insertions(+), 94 deletions(-) diff --git a/paddle/fluid/framework/data_device_transform.cc b/paddle/fluid/framework/data_device_transform.cc index a79bc4bc2c..7d005c9690 100644 --- a/paddle/fluid/framework/data_device_transform.cc +++ b/paddle/fluid/framework/data_device_transform.cc @@ -27,8 +27,10 @@ void TransDataDevice(const Tensor &in, const platform::Place &dst_place, "supported between CPU and CUDA.")); // NOTE(yy): TransDataDevice should wait for computation of input. - platform::DeviceContextPool::Instance().Get(in.place())->Wait(); - platform::DeviceContextPool::Instance().Get(dst_place)->Wait(); + if (!platform::is_cuda_pinned_place(in.place())) { + platform::DeviceContextPool::Instance().Get(in.place())->Wait(); + platform::DeviceContextPool::Instance().Get(dst_place)->Wait(); + } // FIXME(zcd): TransDataDevice is used to transform data from GPU to CPU and // the enforced checkings have been done in GetDeviceContext, so the diff --git a/paddle/fluid/framework/tensor_util.cc b/paddle/fluid/framework/tensor_util.cc index 853abda734..50637a0c3d 100644 --- a/paddle/fluid/framework/tensor_util.cc +++ b/paddle/fluid/framework/tensor_util.cc @@ -55,8 +55,13 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place, BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size); } #ifdef PADDLE_WITH_CUDA - else if (platform::is_gpu_place(src_place) && // NOLINT + else if (platform::is_cuda_pinned_place(src_place) && // NOLINT platform::is_cpu_place(dst_place)) { + memory::Copy(BOOST_GET_CONST(platform::CPUPlace, dst_place), dst_ptr, + BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place), src_ptr, + size); + } else if (platform::is_gpu_place(src_place) && // NOLINT + platform::is_cpu_place(dst_place)) { auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place); auto ctx_place = ctx.GetPlace(); @@ -77,6 +82,28 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place, auto stream = reinterpret_cast(ctx).stream(); memory::Copy(dst_gpu_place, dst_ptr, src_cpu_place, src_ptr, size, stream); + } else if (platform::is_cuda_pinned_place(src_place) && + platform::is_gpu_place(dst_place)) { + auto src_cuda_pinned_place = + BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place); + auto dst_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, dst_place); + auto ctx_place = ctx.GetPlace(); + PADDLE_ENFORCE_EQ(platform::is_gpu_place(ctx_place), true, + platform::errors::PreconditionNotMet( + "Device context place mismatch. When copying Tensor " + "data from CUDA Pinned memory to GPU memory, current " + "device context place should be GPU.")); + auto ctx_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, ctx_place); + PADDLE_ENFORCE_EQ(dst_gpu_place, ctx_gpu_place, + platform::errors::PreconditionNotMet( + "The target GPU device and current device context do " + "not match. The target GPU device number is %d, but " + "device context GPU number is %d.", + dst_gpu_place.device, ctx_gpu_place.device)); + auto stream = + reinterpret_cast(ctx).stream(); + memory::Copy(dst_gpu_place, dst_ptr, src_cuda_pinned_place, src_ptr, size, + stream); } else if (platform::is_gpu_place(src_place) && platform::is_gpu_place(dst_place)) { auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); @@ -148,8 +175,13 @@ void TensorCopySync(const Tensor& src, const platform::Place& dst_place, BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size); } #ifdef PADDLE_WITH_CUDA - else if (platform::is_gpu_place(src_place) && // NOLINT + else if (platform::is_cuda_pinned_place(src_place) && // NOLINT platform::is_cpu_place(dst_place)) { + memory::Copy(BOOST_GET_CONST(platform::CPUPlace, dst_place), dst_ptr, + BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place), src_ptr, + size); + } else if (platform::is_gpu_place(src_place) && // NOLINT + platform::is_cpu_place(dst_place)) { auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place); memory::Copy(dst_cpu_place, dst_ptr, src_gpu_place, src_ptr, size, nullptr); diff --git a/paddle/fluid/operators/cast_op.cc b/paddle/fluid/operators/cast_op.cc index 933d959d58..eb4483c9c5 100644 --- a/paddle/fluid/operators/cast_op.cc +++ b/paddle/fluid/operators/cast_op.cc @@ -67,10 +67,17 @@ class CastOp : public framework::OperatorWithKernel { framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext &ctx) const override { - framework::OpKernelType kt = OperatorWithKernel::GetExpectedKernelType(ctx); // CastOp kernel's device type is decided by input tensor place - kt.place_ = ctx.Input("X")->place(); - return kt; + auto *tensor = ctx.Input("X"); + PADDLE_ENFORCE_EQ(tensor->IsInitialized(), true, + platform::errors::PreconditionNotMet( + "The tensor of Input(X) is not initialized.")); + auto &tensor_place = tensor->place(); + // NOTE: cuda pinned tensor need to copy its data to target place + if (platform::is_cuda_pinned_place(tensor_place)) { + return framework::OpKernelType(tensor->type(), ctx.device_context()); + } + return framework::OpKernelType(tensor->type(), tensor_place); } }; diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index 4d79a7fcb2..e0bcab1fb5 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -42,22 +42,9 @@ BufferedReader::BufferedReader( place_(place), buffer_size_(buffer_size) { VLOG(1) << "BufferedReader"; -#ifdef PADDLE_WITH_CUDA - if (platform::is_gpu_place(place_)) { - int dev_idx = BOOST_GET_CONST(platform::CUDAPlace, place_).device; - compute_stream_ = - ((platform::CUDADeviceContext *)(platform::DeviceContextPool::Instance() - .Get(place_))) - ->stream(); - events_.resize(buffer_size); - for (auto &event : events_) { - event = platform::CudaEventResourcePool::Instance().New(dev_idx); - } - stream_ = platform::CudaStreamResourcePool::Instance().New(dev_idx); - } -#endif + is_same_place_ = false; cpu_buffer_.resize(buffer_size); - gpu_buffer_.resize(buffer_size); + cuda_pinned_buffer_.resize(buffer_size); ReadTillBufferFullAsync(); } @@ -77,70 +64,49 @@ void BufferedReader::ReadAsync(size_t i) { } #ifdef PADDLE_WITH_CUDA - // NOTE(liangdun): using async copy instead of TensorCopySync - // TensorCopySync would block other stream, because TensorCopySync - // issues the copying command to the default stream, it will make two - // commands from different streams cannot run concurrently. if (platform::is_gpu_place(place_)) { - TensorVec &gpu = gpu_buffer_[i]; - if (gpu.empty()) { - gpu.resize(cpu.size()); + // NOTE: [Copy processing of different input devices] + // We may accept input tensor in three different devices: + // - CPUPlace + // - CUDAPinnedPlace + // - CUDAPlace + // CUDA Stream Synchronizing is slow, in order to avoid Synchronizing + // in BufferedReader thread, we do data copy as follows: + // - If src Tensor on CPU memory, we copy it to CUDAPinned memory + // - IF src Tensor on CUDAPinned memory, we use it directly + // - IF src Tensor on CUDA memory, we use it directly + platform::CUDAPinnedPlace cuda_pinned_place; + TensorVec &cuda_pinned = cuda_pinned_buffer_[i]; + if (cuda_pinned.empty()) { + cuda_pinned.resize(cpu.size()); } else { PADDLE_ENFORCE_EQ( - gpu.size(), cpu.size(), + cuda_pinned.size(), cpu.size(), platform::errors::InvalidArgument( "Input tensor number on GPU and CPU devices are not matched.")); } - std::vector gpu_ptrs; - gpu_ptrs.reserve(cpu.size()); - for (size_t i = 0; i < cpu.size(); ++i) { - gpu[i].Resize(cpu[i].dims()); - gpu[i].set_layout(cpu[i].layout()); - gpu_ptrs.emplace_back(gpu[i].mutable_data(place_, cpu[i].type())); - } - - // NOTE(zjl): cudaStreamWaitEvent() must be called after all - // gpu[i].mutable_data() is called, since some ops release - // gpu memory immediately without waiting gpu kernel ends - platform::SetDeviceId( - BOOST_GET_CONST(platform::CUDAPlace, place_).device); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaEventRecord(events_[i].get(), compute_stream_)); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaStreamWaitEvent(stream_.get(), events_[i].get(), 0)); - + std::vector cuda_pinned_ptrs; + cuda_pinned_ptrs.reserve(cpu.size()); platform::RecordEvent record_event("BufferedReader:MemoryCopy"); for (size_t i = 0; i < cpu.size(); ++i) { - auto cpu_place = cpu[i].place(); - auto cpu_ptr = cpu[i].data(); - auto gpu_ptr = gpu_ptrs[i]; - auto size = - cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); - if (platform::is_cuda_pinned_place(cpu_place)) { - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr, - BOOST_GET_CONST(platform::CUDAPinnedPlace, cpu_place), - cpu_ptr, size, stream_.get()); - } else if ((platform::is_gpu_place(cpu_place))) { - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr, - BOOST_GET_CONST(platform::CUDAPlace, cpu_place), cpu_ptr, - size, stream_.get()); + if (platform::is_cpu_place(cpu[i].place())) { + cuda_pinned[i].Resize(cpu[i].dims()); + cuda_pinned[i].set_layout(cpu[i].layout()); + cuda_pinned_ptrs.emplace_back( + cuda_pinned[i].mutable_data(cuda_pinned_place, cpu[i].type())); + auto size = + cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); + + memory::Copy(cuda_pinned_place, cuda_pinned_ptrs[i], + BOOST_GET_CONST(platform::CPUPlace, cpu[i].place()), + cpu[i].data(), size); + cuda_pinned[i].set_lod(cpu[i].lod()); } else { - platform::CUDAPinnedPlace cuda_pinned_place; - framework::LoDTensor cuda_pinned_tensor; - cuda_pinned_tensor.Resize(cpu[i].dims()); - auto cuda_pinned_ptr = - cuda_pinned_tensor.mutable_data(cuda_pinned_place, cpu[i].type()); - memory::Copy(cuda_pinned_place, cuda_pinned_ptr, - BOOST_GET_CONST(platform::CPUPlace, cpu_place), cpu_ptr, - size); - memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr, - cuda_pinned_place, cuda_pinned_ptr, size, stream_.get()); - PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream_.get())); + // we set same place flag & use cpu[i] directly + is_same_place_ = true; } - gpu[i].set_lod(cpu[i].lod()); } - PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream_.get())); } #endif return i; @@ -174,8 +140,9 @@ void BufferedReader::ReadNextImpl(std::vector *out) { return; } - *out = std::move(platform::is_gpu_place(place_) ? gpu_buffer_[i] - : cpu_buffer_[i]); + *out = std::move((platform::is_gpu_place(place_) && !is_same_place_) + ? cuda_pinned_buffer_[i] + : cpu_buffer_[i]); // Do not push current position into ReadAsync. Push the previous position // Since all computation in fluid are async, change the data of diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index 89ecea9583..4409aa4d39 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -61,14 +61,10 @@ class BufferedReader : public framework::DecoratedReader { // buffer, just read async and create futures as buffer size. However, to // malloc tensors every time is extremely slow. Here we store all data in // buffers and prevent alloc every time. + bool is_same_place_; std::vector cpu_buffer_; - std::vector gpu_buffer_; + std::vector cuda_pinned_buffer_; size_t prev_pos_{-1UL}; -#ifdef PADDLE_WITH_CUDA - cudaStream_t compute_stream_; - std::shared_ptr stream_; - std::vector> events_; -#endif }; } // namespace reader diff --git a/paddle/fluid/platform/init.cc b/paddle/fluid/platform/init.cc index d9c8026bd2..8e8160fe61 100644 --- a/paddle/fluid/platform/init.cc +++ b/paddle/fluid/platform/init.cc @@ -166,13 +166,15 @@ void InitDevices(bool init_p2p, const std::vector devices) { LOG(WARNING) << "Invalid devices id."; continue; } - places.emplace_back(platform::CUDAPlace(devices[i])); } if (init_p2p) { InitP2P(devices); } places.emplace_back(platform::CPUPlace()); +#ifdef PADDLE_WITH_CUDA + places.emplace_back(platform::CUDAPinnedPlace()); +#endif platform::DeviceContextPool::Init(places); #ifndef PADDLE_WITH_MKLDNN diff --git a/paddle/fluid/platform/init_test.cc b/paddle/fluid/platform/init_test.cc index 3f911843c5..6392c4f4c4 100644 --- a/paddle/fluid/platform/init_test.cc +++ b/paddle/fluid/platform/init_test.cc @@ -35,7 +35,7 @@ TEST(InitDevices, CUDA) { int count = paddle::platform::GetCUDADeviceCount(); InitDevices(true); DeviceContextPool& pool = DeviceContextPool::Instance(); - ASSERT_EQ(pool.size(), 1U + static_cast(count)); + ASSERT_EQ(pool.size(), 2U + static_cast(count)); #endif } diff --git a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py index a16f21c0f9..f8cb6170be 100644 --- a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py +++ b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py @@ -122,8 +122,14 @@ class TestBase(unittest.TestCase): label = item['label'] assert image.shape() == [BATCH_SIZE, 784] assert label.shape() == [BATCH_SIZE, 1] - assert image._place()._equals(ps[i]) - assert label._place()._equals(ps[i]) + if ps[i]._equals(fluid.CPUPlace()): + assert image._place()._equals(fluid.CPUPlace()) + assert label._place()._equals(fluid.CPUPlace()) + else: + assert image._place()._equals( + fluid.CUDAPinnedPlace()) + assert label._place()._equals( + fluid.CUDAPinnedPlace()) L, = exe.run(program=prog, feed=d, fetch_list=[loss], diff --git a/python/paddle/fluid/tests/unittests/test_generator_dataloader.py b/python/paddle/fluid/tests/unittests/test_generator_dataloader.py index 0945b59321..6660bfb0c7 100644 --- a/python/paddle/fluid/tests/unittests/test_generator_dataloader.py +++ b/python/paddle/fluid/tests/unittests/test_generator_dataloader.py @@ -117,7 +117,6 @@ class TestBase(unittest.TestCase): for _ in six.moves.range(EPOCH_NUM): step = 0 for d in py_reader(): - print(d) assert len(d) == len(places), "{} != {}".format( len(d), len(places)) for i, item in enumerate(d): @@ -125,8 +124,14 @@ class TestBase(unittest.TestCase): label = item['label'] assert image.shape() == [BATCH_SIZE, 784] assert label.shape() == [BATCH_SIZE, 1] - assert image._place()._equals(ps[i]) - assert label._place()._equals(ps[i]) + if ps[i]._equals(fluid.CPUPlace()): + assert image._place()._equals(fluid.CPUPlace()) + assert label._place()._equals(fluid.CPUPlace()) + else: + assert image._place()._equals( + fluid.CUDAPinnedPlace()) + assert label._place()._equals( + fluid.CUDAPinnedPlace()) L, = exe.run(program=prog, feed=d, fetch_list=[loss], diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py index 617527242f..f3b15835b9 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py @@ -154,7 +154,7 @@ class TestDataLoaderWorkerLoop(unittest.TestCase): def run_with_worker_done(self, use_shared_memory=True): try: - place = fluid.CUDAPlace(0) + place = fluid.CPUPlace() with fluid.dygraph.guard(place): dataset = RandomDataset(800) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py index 38497f91fc..e5f44403a9 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_static.py @@ -137,8 +137,14 @@ class TestStaticDataLoader(unittest.TestCase): label = item['label'] assert image.shape() == [BATCH_SIZE, IMAGE_SIZE] assert label.shape() == [BATCH_SIZE, 1] - assert image._place()._equals(places[i]) - assert label._place()._equals(places[i]) + if places[i]._equals(fluid.CPUPlace()): + assert image._place()._equals(fluid.CPUPlace()) + assert label._place()._equals(fluid.CPUPlace()) + else: + assert image._place()._equals(fluid.CUDAPinnedPlace( + )) + assert label._place()._equals(fluid.CUDAPinnedPlace( + )) L, = exe.run(program=prog, feed=d, fetch_list=[loss], diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py index 16006b7e3d..39cb6651a4 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_reader_exception.py @@ -77,7 +77,7 @@ class TestMultiprocessReaderException(unittest.TestCase): reader.decorate_sample_generator( decorated_reader, batch_size=batch_size, - places=fluid.cuda_places()) + places=fluid.cuda_places(0)) else: reader.decorate_sample_generator( decorated_reader, diff --git a/python/paddle/fluid/tests/unittests/test_reader_reset.py b/python/paddle/fluid/tests/unittests/test_reader_reset.py index cb1be32935..2cef896aa7 100644 --- a/python/paddle/fluid/tests/unittests/test_reader_reset.py +++ b/python/paddle/fluid/tests/unittests/test_reader_reset.py @@ -62,7 +62,8 @@ class TestReaderReset(unittest.TestCase): paddle.batch( self.prepare_data(), batch_size=self.batch_size)) - train_cp = compiler.CompiledProgram(main_prog).with_data_parallel() + train_cp = compiler.CompiledProgram(main_prog).with_data_parallel( + places=[place]) batch_id = 0 pass_count = 0 -- GitLab