未验证 提交 bba74071 编写于 作者: Z Zeng Jinle 提交者: GitHub

add cuda resource pool for BufferedReader, test=develop (#23152)

上级 07a1df8f
......@@ -17,8 +17,8 @@
#include <utility>
#include <vector>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
namespace operators {
namespace reader {
......@@ -32,15 +32,6 @@ BufferedReader::~BufferedReader() {
}
position_.pop();
}
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaStreamDestroy(stream_));
for (auto &event : events_) {
PADDLE_ENFORCE(cudaEventDestroy(event));
}
}
#endif
}
BufferedReader::BufferedReader(
......@@ -53,16 +44,16 @@ BufferedReader::BufferedReader(
VLOG(1) << "BufferedReader";
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) {
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
int dev_idx = boost::get<platform::CUDAPlace>(place_).device;
compute_stream_ =
((platform::CUDADeviceContext *)(platform::DeviceContextPool::Instance()
.Get(place_)))
->stream();
events_.resize(buffer_size);
for (auto &event : events_) {
PADDLE_ENFORCE(cudaEventCreateWithFlags(&event, cudaEventDisableTiming));
event = platform::CudaEventResourcePool::Instance().New(dev_idx);
}
PADDLE_ENFORCE(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking));
stream_ = platform::CudaStreamResourcePool::Instance().New(dev_idx);
}
#endif
cpu_buffer_.resize(buffer_size);
......@@ -112,8 +103,14 @@ void BufferedReader::ReadAsync(size_t i) {
// gpu[i].mutable_data() is called, since some ops release
// gpu memory immediately without waiting gpu kernel ends
platform::SetDeviceId(boost::get<platform::CUDAPlace>(place_).device);
PADDLE_ENFORCE(cudaEventRecord(events_[i], compute_stream_));
PADDLE_ENFORCE(cudaStreamWaitEvent(stream_, events_[i], 0));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(events_[i].get(), compute_stream_),
platform::errors::Fatal(
"cudaEventRecord raises unexpected exception"));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(stream_.get(), events_[i].get(), 0),
platform::errors::Fatal(
"cudaStreamWaitEvent raises unexpected exception"));
platform::RecordEvent record_event("BufferedReader:MemoryCopy");
for (size_t i = 0; i < cpu.size(); ++i) {
......@@ -125,11 +122,11 @@ void BufferedReader::ReadAsync(size_t i) {
if (platform::is_cuda_pinned_place(cpu_place)) {
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPinnedPlace>(cpu_place),
cpu_ptr, size, stream_);
cpu_ptr, size, stream_.get());
} else if ((platform::is_gpu_place(cpu_place))) {
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
boost::get<platform::CUDAPlace>(cpu_place), cpu_ptr,
size, stream_);
size, stream_.get());
} else {
platform::CUDAPinnedPlace cuda_pinned_place;
framework::LoDTensor cuda_pinned_tensor;
......@@ -140,13 +137,18 @@ void BufferedReader::ReadAsync(size_t i) {
boost::get<platform::CPUPlace>(cpu_place), cpu_ptr,
size);
memory::Copy(boost::get<platform::CUDAPlace>(place_), gpu_ptr,
cuda_pinned_place, cuda_pinned_ptr, size, stream_);
PADDLE_ENFORCE(cudaStreamSynchronize(stream_),
"cuda stream sync error.");
cuda_pinned_place, cuda_pinned_ptr, size, stream_.get());
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamSynchronize(stream_.get()),
platform::errors::Fatal(
"cudaStreamSynchronize raises unexpected exception"));
}
gpu[i].set_lod(cpu[i].lod());
}
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamSynchronize(stream_.get()),
platform::errors::Fatal(
"cudaStreamSynchronize raises unexpected exception"));
}
#endif
return i;
......
......@@ -21,6 +21,7 @@
#include "ThreadPool.h"
#include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_resource_pool.h"
#include "paddle/fluid/platform/gpu_info.h"
#endif
......@@ -64,9 +65,9 @@ class BufferedReader : public framework::DecoratedReader {
std::vector<TensorVec> gpu_buffer_;
size_t prev_pos_{-1UL};
#ifdef PADDLE_WITH_CUDA
cudaStream_t stream_;
cudaStream_t compute_stream_;
std::vector<cudaEvent_t> events_;
std::shared_ptr<platform::CudaStreamObject> stream_;
std::vector<std::shared_ptr<platform::CudaEventObject>> events_;
#endif
};
......
......@@ -57,6 +57,7 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
VLOG(10) << "Create new double buffer reader on " << place;
out->Clear();
out->Reset(framework::MakeDecoratedReader<BufferedReader>(underlying_reader,
place, 2));
}
......
......@@ -80,6 +80,11 @@ cc_library(device_context SRCS device_context.cc init.cc DEPS simple_threadpool
cc_library(collective_helper SRCS collective_helper.cc DEPS framework_proto device_context enforce)
if(WITH_GPU)
cc_library(cuda_resource_pool SRCS cuda_resource_pool.cc DEPS gpu_info)
target_link_libraries(device_context cuda_resource_pool)
endif()
if(WIN32)
if(WITH_GPU AND NOT WITH_DSO)
get_property(cuda_modules GLOBAL PROPERTY CUDA_MODULES)
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_resource_pool.h"
#include "paddle/fluid/platform/gpu_info.h"
namespace paddle {
namespace platform {
CudaStreamResourcePool::CudaStreamResourcePool() {
int dev_cnt = platform::GetCUDADeviceCount();
pool_.reserve(dev_cnt);
for (int dev_idx = 0; dev_idx < dev_cnt; ++dev_idx) {
auto creator = [dev_idx] {
platform::SetDeviceId(dev_idx);
cudaStream_t stream;
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking),
platform::errors::Fatal(
"cudaStreamCreateWithFlags raises unexpected exception"));
return stream;
};
auto deleter = [dev_idx](cudaStream_t stream) {
platform::SetDeviceId(dev_idx);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamDestroy(stream),
platform::errors::Fatal(
"cudaStreamDestroy raises unexpected exception"));
};
pool_.emplace_back(
ResourcePool<CudaStreamObject>::Create(creator, deleter));
}
}
CudaStreamResourcePool& CudaStreamResourcePool::Instance() {
static CudaStreamResourcePool pool;
return pool;
}
std::shared_ptr<CudaStreamObject> CudaStreamResourcePool::New(int dev_idx) {
PADDLE_ENFORCE_GE(
dev_idx, 0,
platform::errors::InvalidArgument(
"dev_idx should be not less than 0, but got %d", dev_idx));
PADDLE_ENFORCE_LT(
dev_idx, pool_.size(),
platform::errors::OutOfRange(
"dev_idx should be less than device count %d, but got %d",
pool_.size(), dev_idx));
return pool_[dev_idx]->New();
}
CudaEventResourcePool::CudaEventResourcePool() {
int dev_cnt = platform::GetCUDADeviceCount();
pool_.reserve(dev_cnt);
for (int dev_idx = 0; dev_idx < dev_cnt; ++dev_idx) {
auto creator = [dev_idx] {
platform::SetDeviceId(dev_idx);
cudaEvent_t event;
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventCreateWithFlags(&event, cudaEventDisableTiming),
platform::errors::Fatal(
"cudaEventCreateWithFlags raises unexpected exception"));
return event;
};
auto deleter = [dev_idx](cudaEvent_t event) {
platform::SetDeviceId(dev_idx);
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventDestroy(event),
platform::errors::Fatal(
"cudaEventDestroy raises unexpected exception"));
};
pool_.emplace_back(ResourcePool<CudaEventObject>::Create(creator, deleter));
}
}
CudaEventResourcePool& CudaEventResourcePool::Instance() {
static CudaEventResourcePool pool;
return pool;
}
std::shared_ptr<CudaEventObject> CudaEventResourcePool::New(int dev_idx) {
PADDLE_ENFORCE_GE(
dev_idx, 0,
platform::errors::InvalidArgument(
"dev_idx should be not less than 0, but got %d", dev_idx));
PADDLE_ENFORCE_LT(
dev_idx, pool_.size(),
platform::errors::OutOfRange(
"dev_idx should be less than device count %d, but got %d",
pool_.size(), dev_idx));
return pool_[dev_idx]->New();
}
} // namespace platform
} // namespace paddle
#endif
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef PADDLE_WITH_CUDA
#include <cuda.h>
#include <cuda_runtime.h>
#include <memory>
#include <type_traits>
#include <vector>
#include "paddle/fluid/platform/resource_pool.h"
namespace paddle {
namespace platform {
using CudaStreamObject = std::remove_pointer<cudaStream_t>::type;
using CudaEventObject = std::remove_pointer<cudaEvent_t>::type;
class CudaStreamResourcePool {
public:
std::shared_ptr<CudaStreamObject> New(int dev_idx);
static CudaStreamResourcePool &Instance();
private:
CudaStreamResourcePool();
DISABLE_COPY_AND_ASSIGN(CudaStreamResourcePool);
private:
std::vector<std::shared_ptr<ResourcePool<CudaStreamObject>>> pool_;
};
class CudaEventResourcePool {
public:
std::shared_ptr<CudaEventObject> New(int dev_idx);
static CudaEventResourcePool &Instance();
private:
CudaEventResourcePool();
DISABLE_COPY_AND_ASSIGN(CudaEventResourcePool);
private:
std::vector<std::shared_ptr<ResourcePool<CudaEventObject>>> pool_;
};
} // namespace platform
} // namespace paddle
#endif
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/macros.h"
namespace paddle {
namespace platform {
template <typename T>
class ResourcePool : public std::enable_shared_from_this<ResourcePool<T>> {
private:
struct ResourceDeleter {
public:
explicit ResourceDeleter(ResourcePool<T> *pool)
: instance_(pool->shared_from_this()) {}
void operator()(T *ptr) const { instance_->Restore(ptr); }
private:
std::shared_ptr<ResourcePool<T>> instance_;
};
public:
static std::shared_ptr<ResourcePool<T>> Create(
const std::function<T *()> &creator,
const std::function<void(T *)> &deleter) {
return std::shared_ptr<ResourcePool<T>>(
new ResourcePool<T>(creator, deleter));
}
~ResourcePool() {
for (auto *ptr : instances_) {
deleter_(ptr);
}
}
std::shared_ptr<T> New() {
std::lock_guard<std::mutex> guard(mtx_);
T *obj = nullptr;
if (instances_.empty()) {
obj = creator_();
PADDLE_ENFORCE_NOT_NULL(obj,
platform::errors::PermissionDenied(
"The creator should not return nullptr"));
VLOG(10) << "Create new instance " << TypePtrName();
} else {
obj = instances_.back();
instances_.pop_back();
VLOG(10) << "Pop new instance " << TypePtrName()
<< " from pool, size=" << instances_.size();
}
return std::shared_ptr<T>(obj, ResourceDeleter(this));
}
private:
static std::string TypePtrName() {
return platform::demangle(typeid(T *).name()); // NOLINT
}
private:
ResourcePool(const std::function<T *()> &creator,
const std::function<void(T *)> &deleter)
: creator_(creator), deleter_(deleter) {}
void Restore(T *ptr) {
std::lock_guard<std::mutex> guard(mtx_);
instances_.emplace_back(ptr);
VLOG(10) << "Restore " << TypePtrName()
<< " into pool, size=" << instances_.size();
}
private:
std::vector<T *> instances_;
std::function<T *()> creator_;
std::function<void(T *)> deleter_;
std::mutex mtx_;
};
} // namespace platform
} // namespace paddle
......@@ -580,6 +580,7 @@ class DygraphGeneratorLoader(DataLoaderBase):
self._need_check_feed = []
self._blocking_queue = core.init_lod_tensor_blocking_queue(
core.Variable(), self._capacity, False)
self._reader = None
self._reader = core.create_py_reader(
self.queue, self._var_names, self._shapes, self._dtypes,
self._need_check_feed, self._places, self._use_double_buffer, True)
......@@ -832,6 +833,7 @@ class GeneratorLoader(DataLoaderBase):
]
self._queue = core.init_lod_tensor_blocking_queue(
core.Variable(), self._capacity, self._keep_order)
self._reader = None
self._reader = core.create_py_reader(
self.queue, self._var_names, self._shapes, self._dtypes,
self._need_check_feed, self._places, self._use_double_buffer,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册