diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index f5d55791d86c68bf800b869ee2be981bd6ab63b5..17c84530b23e667d8da4bf18cf44a89d44b1b51e 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -68,7 +68,6 @@ BufferedReader::BufferedReader( stream_ = platform::NpuStreamResourcePool::Instance().New(dev_idx); } #endif - is_same_place_ = false; cpu_buffer_.resize(buffer_size); cuda_buffer_.resize(buffer_size); npu_buffer_.resize(buffer_size); @@ -116,7 +115,7 @@ void BufferedReader::ReadAsync(size_t i) { std::vector cuda_pinned_ptrs; cuda_pinned_ptrs.reserve(cpu.size()); platform::RecordEvent record_event("BufferedReader:MemoryCopy"); - // NODE(chenwehiang): When we use CUDAPinned Memory, we need call + // NODE(chenweihang): When we use CUDAPinned Memory, we need call // cudaHostAlloc, that is a CUDA API, calling CUDA API need load // cuda lib into device, it will cost hundreds of MB of GPU memory. // If we don't set Device here, which will use CUDAPlace(0) default. @@ -126,18 +125,21 @@ void BufferedReader::ReadAsync(size_t i) { if (platform::is_cpu_place(cpu[i].place())) { cuda[i].Resize(cpu[i].dims()); cuda[i].set_layout(cpu[i].layout()); - cuda_pinned_ptrs.emplace_back( - cuda[i].mutable_data(cuda_pinned_place, cpu[i].type())); + cuda_pinned_ptrs[i] = + cuda[i].mutable_data(cuda_pinned_place, cpu[i].type()); auto size = cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); memory::Copy(cuda_pinned_place, cuda_pinned_ptrs[i], BOOST_GET_CONST(platform::CPUPlace, cpu[i].place()), cpu[i].data(), size); + cuda[i].set_lod(cpu[i].lod()); } else { - // we set same place flag & use cpu[i] directly - is_same_place_ = true; + // Here the cpu[i]'s place may be CUDAPlace, CUDAPinnedPlace, or + // others, we don't copy the memory of it to CUDAPinnedPlace, but + // we should share tensor data to cuda[i] + cuda[i].ShareDataWith(cpu[i]); } } } else { @@ -296,9 +298,9 @@ void BufferedReader::ReadNextImpl(std::vector *out) { return; } - if (platform::is_gpu_place(place_) && !is_same_place_) { + if (platform::is_gpu_place(place_)) { *out = std::move(cuda_buffer_[i]); - } else if (platform::is_npu_place(place_) && !is_same_place_) { + } else if (platform::is_npu_place(place_)) { *out = std::move(npu_buffer_[i]); } else { *out = std::move(cpu_buffer_[i]); diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index 9f7b0e753281eb2e6476bc931b454b3b15340c3c..5b4bbc7d62cd8f1cdb64b0454279dada2f1a0e69 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -67,7 +67,6 @@ class BufferedReader : public framework::DecoratedReader { // buffer, just read async and create futures as buffer size. However, to // malloc tensors every time is extremely slow. Here we store all data in // buffers and prevent alloc every time. - bool is_same_place_; std::vector cpu_buffer_; std::vector cuda_buffer_; std::vector npu_buffer_; diff --git a/python/paddle/fluid/tests/unittests/test_dataloader_dataset.py b/python/paddle/fluid/tests/unittests/test_dataloader_dataset.py index b8c498fe4a3c71296101bc08e6bbbe0887ac8b6c..08589f0191d8c698ecd8a4017d6c14fa476610b1 100644 --- a/python/paddle/fluid/tests/unittests/test_dataloader_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataloader_dataset.py @@ -14,9 +14,12 @@ from __future__ import division +import sys import unittest import numpy as np +import paddle +import paddle.vision.transforms as transforms import paddle.fluid as fluid from paddle.io import * @@ -37,5 +40,48 @@ class TestDatasetAbstract(unittest.TestCase): pass +class TestDatasetWithDiffOutputPlace(unittest.TestCase): + def get_dataloader(self, num_workers): + dataset = paddle.vision.datasets.MNIST( + mode='test', transform=transforms.ToTensor()) + loader = paddle.io.DataLoader( + dataset, batch_size=32, num_workers=num_workers, shuffle=True) + return loader + + def run_check_on_cpu(self): + paddle.set_device('cpu') + loader = self.get_dataloader(0) + for image, label in loader: + self.assertTrue(image.place.is_cpu_place()) + self.assertTrue(label.place.is_cpu_place()) + break + + def test_single_process(self): + self.run_check_on_cpu() + if paddle.is_compiled_with_cuda(): + # Get (image, label) tuple from MNIST dataset + # - the image is on CUDAPlace, label is on CPUPlace + paddle.set_device('gpu') + loader = self.get_dataloader(0) + for image, label in loader: + self.assertTrue(image.place.is_gpu_place()) + self.assertTrue(label.place.is_cuda_pinned_place()) + break + + def test_multi_process(self): + # DataLoader with multi-process mode is not supported on MacOs and Windows currently + if sys.platform != 'darwin' and sys.platform != 'win32': + self.run_check_on_cpu() + if paddle.is_compiled_with_cuda(): + # Get (image, label) tuple from MNIST dataset + # - the image and label are on CPUPlace + paddle.set_device('gpu') + loader = self.get_dataloader(1) + for image, label in loader: + self.assertTrue(image.place.is_cuda_pinned_place()) + self.assertTrue(label.place.is_cuda_pinned_place()) + break + + if __name__ == '__main__': unittest.main()