未验证 提交 1b3081b1 编写于 作者: C Chen Weihang 提交者: GitHub

Simplify BufferedReader to improve DataLoader performance (#25648)

* simplify buffered reader to improve DataLoader performance

* fix 22 failed unittests

* fix cuda pinned context condition

* fix test_reader_reset failed

* fix two failed unittests

* change unittest place

* polish error messaage

* polish cast op GetExpecctedKernelType

* remove debug info in unittest
上级 55b6205d
...@@ -27,8 +27,10 @@ void TransDataDevice(const Tensor &in, const platform::Place &dst_place, ...@@ -27,8 +27,10 @@ void TransDataDevice(const Tensor &in, const platform::Place &dst_place,
"supported between CPU and CUDA.")); "supported between CPU and CUDA."));
// NOTE(yy): TransDataDevice should wait for computation of input. // NOTE(yy): TransDataDevice should wait for computation of input.
if (!platform::is_cuda_pinned_place(in.place())) {
platform::DeviceContextPool::Instance().Get(in.place())->Wait(); platform::DeviceContextPool::Instance().Get(in.place())->Wait();
platform::DeviceContextPool::Instance().Get(dst_place)->Wait(); platform::DeviceContextPool::Instance().Get(dst_place)->Wait();
}
// FIXME(zcd): TransDataDevice is used to transform data from GPU to CPU and // FIXME(zcd): TransDataDevice is used to transform data from GPU to CPU and
// the enforced checkings have been done in GetDeviceContext, so the // the enforced checkings have been done in GetDeviceContext, so the
......
...@@ -55,7 +55,12 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place, ...@@ -55,7 +55,12 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place,
BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size); BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size);
} }
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
else if (platform::is_gpu_place(src_place) && // NOLINT else if (platform::is_cuda_pinned_place(src_place) && // NOLINT
platform::is_cpu_place(dst_place)) {
memory::Copy(BOOST_GET_CONST(platform::CPUPlace, dst_place), dst_ptr,
BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place), src_ptr,
size);
} else if (platform::is_gpu_place(src_place) && // NOLINT
platform::is_cpu_place(dst_place)) { platform::is_cpu_place(dst_place)) {
auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place);
auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place); auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place);
...@@ -77,6 +82,28 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place, ...@@ -77,6 +82,28 @@ void TensorCopy(const Tensor& src, const platform::Place& dst_place,
auto stream = auto stream =
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream(); reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream();
memory::Copy(dst_gpu_place, dst_ptr, src_cpu_place, src_ptr, size, stream); memory::Copy(dst_gpu_place, dst_ptr, src_cpu_place, src_ptr, size, stream);
} else if (platform::is_cuda_pinned_place(src_place) &&
platform::is_gpu_place(dst_place)) {
auto src_cuda_pinned_place =
BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place);
auto dst_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, dst_place);
auto ctx_place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_gpu_place(ctx_place), true,
platform::errors::PreconditionNotMet(
"Device context place mismatch. When copying Tensor "
"data from CUDA Pinned memory to GPU memory, current "
"device context place should be GPU."));
auto ctx_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, ctx_place);
PADDLE_ENFORCE_EQ(dst_gpu_place, ctx_gpu_place,
platform::errors::PreconditionNotMet(
"The target GPU device and current device context do "
"not match. The target GPU device number is %d, but "
"device context GPU number is %d.",
dst_gpu_place.device, ctx_gpu_place.device));
auto stream =
reinterpret_cast<const platform::CUDADeviceContext&>(ctx).stream();
memory::Copy(dst_gpu_place, dst_ptr, src_cuda_pinned_place, src_ptr, size,
stream);
} else if (platform::is_gpu_place(src_place) && } else if (platform::is_gpu_place(src_place) &&
platform::is_gpu_place(dst_place)) { platform::is_gpu_place(dst_place)) {
auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place);
...@@ -148,7 +175,12 @@ void TensorCopySync(const Tensor& src, const platform::Place& dst_place, ...@@ -148,7 +175,12 @@ void TensorCopySync(const Tensor& src, const platform::Place& dst_place,
BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size); BOOST_GET_CONST(platform::CPUPlace, src_place), src_ptr, size);
} }
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
else if (platform::is_gpu_place(src_place) && // NOLINT else if (platform::is_cuda_pinned_place(src_place) && // NOLINT
platform::is_cpu_place(dst_place)) {
memory::Copy(BOOST_GET_CONST(platform::CPUPlace, dst_place), dst_ptr,
BOOST_GET_CONST(platform::CUDAPinnedPlace, src_place), src_ptr,
size);
} else if (platform::is_gpu_place(src_place) && // NOLINT
platform::is_cpu_place(dst_place)) { platform::is_cpu_place(dst_place)) {
auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place); auto src_gpu_place = BOOST_GET_CONST(platform::CUDAPlace, src_place);
auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place); auto dst_cpu_place = BOOST_GET_CONST(platform::CPUPlace, dst_place);
......
...@@ -67,10 +67,17 @@ class CastOp : public framework::OperatorWithKernel { ...@@ -67,10 +67,17 @@ class CastOp : public framework::OperatorWithKernel {
framework::OpKernelType GetExpectedKernelType( framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override { const framework::ExecutionContext &ctx) const override {
framework::OpKernelType kt = OperatorWithKernel::GetExpectedKernelType(ctx);
// CastOp kernel's device type is decided by input tensor place // CastOp kernel's device type is decided by input tensor place
kt.place_ = ctx.Input<framework::LoDTensor>("X")->place(); auto *tensor = ctx.Input<framework::LoDTensor>("X");
return kt; PADDLE_ENFORCE_EQ(tensor->IsInitialized(), true,
platform::errors::PreconditionNotMet(
"The tensor of Input(X) is not initialized."));
auto &tensor_place = tensor->place();
// NOTE: cuda pinned tensor need to copy its data to target place
if (platform::is_cuda_pinned_place(tensor_place)) {
return framework::OpKernelType(tensor->type(), ctx.device_context());
}
return framework::OpKernelType(tensor->type(), tensor_place);
} }
}; };
......
...@@ -42,22 +42,9 @@ BufferedReader::BufferedReader( ...@@ -42,22 +42,9 @@ BufferedReader::BufferedReader(
place_(place), place_(place),
buffer_size_(buffer_size) { buffer_size_(buffer_size) {
VLOG(1) << "BufferedReader"; VLOG(1) << "BufferedReader";
#ifdef PADDLE_WITH_CUDA is_same_place_ = false;
if (platform::is_gpu_place(place_)) {
int dev_idx = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
compute_stream_ =
((platform::CUDADeviceContext *)(platform::DeviceContextPool::Instance()
.Get(place_)))
->stream();
events_.resize(buffer_size);
for (auto &event : events_) {
event = platform::CudaEventResourcePool::Instance().New(dev_idx);
}
stream_ = platform::CudaStreamResourcePool::Instance().New(dev_idx);
}
#endif
cpu_buffer_.resize(buffer_size); cpu_buffer_.resize(buffer_size);
gpu_buffer_.resize(buffer_size); cuda_pinned_buffer_.resize(buffer_size);
ReadTillBufferFullAsync(); ReadTillBufferFullAsync();
} }
...@@ -77,70 +64,49 @@ void BufferedReader::ReadAsync(size_t i) { ...@@ -77,70 +64,49 @@ void BufferedReader::ReadAsync(size_t i) {
} }
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
// NOTE(liangdun): using async copy instead of TensorCopySync
// TensorCopySync would block other stream, because TensorCopySync
// issues the copying command to the default stream, it will make two
// commands from different streams cannot run concurrently.
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
TensorVec &gpu = gpu_buffer_[i]; // NOTE: [Copy processing of different input devices]
if (gpu.empty()) { // We may accept input tensor in three different devices:
gpu.resize(cpu.size()); // - CPUPlace
// - CUDAPinnedPlace
// - CUDAPlace
// CUDA Stream Synchronizing is slow, in order to avoid Synchronizing
// in BufferedReader thread, we do data copy as follows:
// - If src Tensor on CPU memory, we copy it to CUDAPinned memory
// - IF src Tensor on CUDAPinned memory, we use it directly
// - IF src Tensor on CUDA memory, we use it directly
platform::CUDAPinnedPlace cuda_pinned_place;
TensorVec &cuda_pinned = cuda_pinned_buffer_[i];
if (cuda_pinned.empty()) {
cuda_pinned.resize(cpu.size());
} else { } else {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
gpu.size(), cpu.size(), cuda_pinned.size(), cpu.size(),
platform::errors::InvalidArgument( platform::errors::InvalidArgument(
"Input tensor number on GPU and CPU devices are not matched.")); "Input tensor number on GPU and CPU devices are not matched."));
} }
std::vector<void *> gpu_ptrs; std::vector<void *> cuda_pinned_ptrs;
gpu_ptrs.reserve(cpu.size()); cuda_pinned_ptrs.reserve(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) {
gpu[i].Resize(cpu[i].dims());
gpu[i].set_layout(cpu[i].layout());
gpu_ptrs.emplace_back(gpu[i].mutable_data(place_, cpu[i].type()));
}
// NOTE(zjl): cudaStreamWaitEvent() must be called after all
// gpu[i].mutable_data() is called, since some ops release
// gpu memory immediately without waiting gpu kernel ends
platform::SetDeviceId(
BOOST_GET_CONST(platform::CUDAPlace, place_).device);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(events_[i].get(), compute_stream_));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(stream_.get(), events_[i].get(), 0));
platform::RecordEvent record_event("BufferedReader:MemoryCopy"); platform::RecordEvent record_event("BufferedReader:MemoryCopy");
for (size_t i = 0; i < cpu.size(); ++i) { for (size_t i = 0; i < cpu.size(); ++i) {
auto cpu_place = cpu[i].place(); if (platform::is_cpu_place(cpu[i].place())) {
auto cpu_ptr = cpu[i].data<void>(); cuda_pinned[i].Resize(cpu[i].dims());
auto gpu_ptr = gpu_ptrs[i]; cuda_pinned[i].set_layout(cpu[i].layout());
cuda_pinned_ptrs.emplace_back(
cuda_pinned[i].mutable_data(cuda_pinned_place, cpu[i].type()));
auto size = auto size =
cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type());
if (platform::is_cuda_pinned_place(cpu_place)) {
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr, memory::Copy(cuda_pinned_place, cuda_pinned_ptrs[i],
BOOST_GET_CONST(platform::CUDAPinnedPlace, cpu_place), BOOST_GET_CONST(platform::CPUPlace, cpu[i].place()),
cpu_ptr, size, stream_.get()); cpu[i].data<void>(), size);
} else if ((platform::is_gpu_place(cpu_place))) { cuda_pinned[i].set_lod(cpu[i].lod());
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr,
BOOST_GET_CONST(platform::CUDAPlace, cpu_place), cpu_ptr,
size, stream_.get());
} else { } else {
platform::CUDAPinnedPlace cuda_pinned_place; // we set same place flag & use cpu[i] directly
framework::LoDTensor cuda_pinned_tensor; is_same_place_ = true;
cuda_pinned_tensor.Resize(cpu[i].dims());
auto cuda_pinned_ptr =
cuda_pinned_tensor.mutable_data(cuda_pinned_place, cpu[i].type());
memory::Copy(cuda_pinned_place, cuda_pinned_ptr,
BOOST_GET_CONST(platform::CPUPlace, cpu_place), cpu_ptr,
size);
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place_), gpu_ptr,
cuda_pinned_place, cuda_pinned_ptr, size, stream_.get());
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream_.get()));
} }
gpu[i].set_lod(cpu[i].lod());
} }
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream_.get()));
} }
#endif #endif
return i; return i;
...@@ -174,7 +140,8 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) { ...@@ -174,7 +140,8 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
return; return;
} }
*out = std::move(platform::is_gpu_place(place_) ? gpu_buffer_[i] *out = std::move((platform::is_gpu_place(place_) && !is_same_place_)
? cuda_pinned_buffer_[i]
: cpu_buffer_[i]); : cpu_buffer_[i]);
// Do not push current position into ReadAsync. Push the previous position // Do not push current position into ReadAsync. Push the previous position
......
...@@ -61,14 +61,10 @@ class BufferedReader : public framework::DecoratedReader { ...@@ -61,14 +61,10 @@ class BufferedReader : public framework::DecoratedReader {
// buffer, just read async and create futures as buffer size. However, to // buffer, just read async and create futures as buffer size. However, to
// malloc tensors every time is extremely slow. Here we store all data in // malloc tensors every time is extremely slow. Here we store all data in
// buffers and prevent alloc every time. // buffers and prevent alloc every time.
bool is_same_place_;
std::vector<TensorVec> cpu_buffer_; std::vector<TensorVec> cpu_buffer_;
std::vector<TensorVec> gpu_buffer_; std::vector<TensorVec> cuda_pinned_buffer_;
size_t prev_pos_{-1UL}; size_t prev_pos_{-1UL};
#ifdef PADDLE_WITH_CUDA
cudaStream_t compute_stream_;
std::shared_ptr<platform::CudaStreamObject> stream_;
std::vector<std::shared_ptr<platform::CudaEventObject>> events_;
#endif
}; };
} // namespace reader } // namespace reader
......
...@@ -166,13 +166,15 @@ void InitDevices(bool init_p2p, const std::vector<int> devices) { ...@@ -166,13 +166,15 @@ void InitDevices(bool init_p2p, const std::vector<int> devices) {
LOG(WARNING) << "Invalid devices id."; LOG(WARNING) << "Invalid devices id.";
continue; continue;
} }
places.emplace_back(platform::CUDAPlace(devices[i])); places.emplace_back(platform::CUDAPlace(devices[i]));
} }
if (init_p2p) { if (init_p2p) {
InitP2P(devices); InitP2P(devices);
} }
places.emplace_back(platform::CPUPlace()); places.emplace_back(platform::CPUPlace());
#ifdef PADDLE_WITH_CUDA
places.emplace_back(platform::CUDAPinnedPlace());
#endif
platform::DeviceContextPool::Init(places); platform::DeviceContextPool::Init(places);
#ifndef PADDLE_WITH_MKLDNN #ifndef PADDLE_WITH_MKLDNN
......
...@@ -35,7 +35,7 @@ TEST(InitDevices, CUDA) { ...@@ -35,7 +35,7 @@ TEST(InitDevices, CUDA) {
int count = paddle::platform::GetCUDADeviceCount(); int count = paddle::platform::GetCUDADeviceCount();
InitDevices(true); InitDevices(true);
DeviceContextPool& pool = DeviceContextPool::Instance(); DeviceContextPool& pool = DeviceContextPool::Instance();
ASSERT_EQ(pool.size(), 1U + static_cast<unsigned>(count)); ASSERT_EQ(pool.size(), 2U + static_cast<unsigned>(count));
#endif #endif
} }
......
...@@ -122,8 +122,14 @@ class TestBase(unittest.TestCase): ...@@ -122,8 +122,14 @@ class TestBase(unittest.TestCase):
label = item['label'] label = item['label']
assert image.shape() == [BATCH_SIZE, 784] assert image.shape() == [BATCH_SIZE, 784]
assert label.shape() == [BATCH_SIZE, 1] assert label.shape() == [BATCH_SIZE, 1]
assert image._place()._equals(ps[i]) if ps[i]._equals(fluid.CPUPlace()):
assert label._place()._equals(ps[i]) assert image._place()._equals(fluid.CPUPlace())
assert label._place()._equals(fluid.CPUPlace())
else:
assert image._place()._equals(
fluid.CUDAPinnedPlace())
assert label._place()._equals(
fluid.CUDAPinnedPlace())
L, = exe.run(program=prog, L, = exe.run(program=prog,
feed=d, feed=d,
fetch_list=[loss], fetch_list=[loss],
......
...@@ -117,7 +117,6 @@ class TestBase(unittest.TestCase): ...@@ -117,7 +117,6 @@ class TestBase(unittest.TestCase):
for _ in six.moves.range(EPOCH_NUM): for _ in six.moves.range(EPOCH_NUM):
step = 0 step = 0
for d in py_reader(): for d in py_reader():
print(d)
assert len(d) == len(places), "{} != {}".format( assert len(d) == len(places), "{} != {}".format(
len(d), len(places)) len(d), len(places))
for i, item in enumerate(d): for i, item in enumerate(d):
...@@ -125,8 +124,14 @@ class TestBase(unittest.TestCase): ...@@ -125,8 +124,14 @@ class TestBase(unittest.TestCase):
label = item['label'] label = item['label']
assert image.shape() == [BATCH_SIZE, 784] assert image.shape() == [BATCH_SIZE, 784]
assert label.shape() == [BATCH_SIZE, 1] assert label.shape() == [BATCH_SIZE, 1]
assert image._place()._equals(ps[i]) if ps[i]._equals(fluid.CPUPlace()):
assert label._place()._equals(ps[i]) assert image._place()._equals(fluid.CPUPlace())
assert label._place()._equals(fluid.CPUPlace())
else:
assert image._place()._equals(
fluid.CUDAPinnedPlace())
assert label._place()._equals(
fluid.CUDAPinnedPlace())
L, = exe.run(program=prog, L, = exe.run(program=prog,
feed=d, feed=d,
fetch_list=[loss], fetch_list=[loss],
......
...@@ -154,7 +154,7 @@ class TestDataLoaderWorkerLoop(unittest.TestCase): ...@@ -154,7 +154,7 @@ class TestDataLoaderWorkerLoop(unittest.TestCase):
def run_with_worker_done(self, use_shared_memory=True): def run_with_worker_done(self, use_shared_memory=True):
try: try:
place = fluid.CUDAPlace(0) place = fluid.CPUPlace()
with fluid.dygraph.guard(place): with fluid.dygraph.guard(place):
dataset = RandomDataset(800) dataset = RandomDataset(800)
......
...@@ -137,8 +137,14 @@ class TestStaticDataLoader(unittest.TestCase): ...@@ -137,8 +137,14 @@ class TestStaticDataLoader(unittest.TestCase):
label = item['label'] label = item['label']
assert image.shape() == [BATCH_SIZE, IMAGE_SIZE] assert image.shape() == [BATCH_SIZE, IMAGE_SIZE]
assert label.shape() == [BATCH_SIZE, 1] assert label.shape() == [BATCH_SIZE, 1]
assert image._place()._equals(places[i]) if places[i]._equals(fluid.CPUPlace()):
assert label._place()._equals(places[i]) assert image._place()._equals(fluid.CPUPlace())
assert label._place()._equals(fluid.CPUPlace())
else:
assert image._place()._equals(fluid.CUDAPinnedPlace(
))
assert label._place()._equals(fluid.CUDAPinnedPlace(
))
L, = exe.run(program=prog, L, = exe.run(program=prog,
feed=d, feed=d,
fetch_list=[loss], fetch_list=[loss],
......
...@@ -77,7 +77,7 @@ class TestMultiprocessReaderException(unittest.TestCase): ...@@ -77,7 +77,7 @@ class TestMultiprocessReaderException(unittest.TestCase):
reader.decorate_sample_generator( reader.decorate_sample_generator(
decorated_reader, decorated_reader,
batch_size=batch_size, batch_size=batch_size,
places=fluid.cuda_places()) places=fluid.cuda_places(0))
else: else:
reader.decorate_sample_generator( reader.decorate_sample_generator(
decorated_reader, decorated_reader,
......
...@@ -62,7 +62,8 @@ class TestReaderReset(unittest.TestCase): ...@@ -62,7 +62,8 @@ class TestReaderReset(unittest.TestCase):
paddle.batch( paddle.batch(
self.prepare_data(), batch_size=self.batch_size)) self.prepare_data(), batch_size=self.batch_size))
train_cp = compiler.CompiledProgram(main_prog).with_data_parallel() train_cp = compiler.CompiledProgram(main_prog).with_data_parallel(
places=[place])
batch_id = 0 batch_id = 0
pass_count = 0 pass_count = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册