未验证 提交 cab0f2f5 编写于 作者: W Wilber 提交者: GitHub

thread_local method to support predictor stream. (#42785)

上级 cff1ef2b
......@@ -100,6 +100,24 @@ void AnalysisConfig::EnableUseGpu(uint64_t memory_pool_init_size_mb,
Update();
}
void AnalysisConfig::SetExecStream(void *stream) {
PADDLE_ENFORCE_NOT_NULL(stream, platform::errors::InvalidArgument(
"`stream` should not be nullptr"));
exec_stream_ = stream;
use_external_stream_ = true;
Update();
}
void *AnalysisConfig::GetExecStream() const {
PADDLE_ENFORCE_NOT_NULL(exec_stream_, platform::errors::InvalidArgument(
"`stream` should not be nullptr"));
return exec_stream_;
}
bool AnalysisConfig::external_stream_enabled() const {
return use_external_stream_;
}
void AnalysisConfig::DisableGpu() {
use_gpu_ = false;
......@@ -239,6 +257,8 @@ AnalysisConfig::AnalysisConfig(const AnalysisConfig &other) {
CP_MEMBER(use_fc_padding_);
// GPU related.
CP_MEMBER(use_gpu_);
CP_MEMBER(use_external_stream_);
CP_MEMBER(exec_stream_);
CP_MEMBER(use_cudnn_);
CP_MEMBER(gpu_device_id_);
CP_MEMBER(memory_pool_init_size_mb_);
......@@ -787,6 +807,8 @@ std::string AnalysisConfig::SerializeInfoCache() {
ss << params_file_;
ss << use_gpu_;
ss << use_external_stream_;
ss << exec_stream_;
ss << use_gpu_fp16_;
for (auto &item : gpu_fp16_disabled_op_types_) ss << item;
ss << use_fc_padding_;
......@@ -985,6 +1007,8 @@ std::string AnalysisConfig::Summary() {
os.InsertRow({"gpu_device_id", std::to_string(gpu_device_id_)});
os.InsertRow({"memory_pool_init_size",
std::to_string(memory_pool_init_size_mb_) + "MB"});
os.InsertRow(
{"use_external_stream", use_external_stream_ ? "true" : "false"});
os.InsertRow(
{"thread_local_stream", thread_local_stream_ ? "true" : "false"});
......
......@@ -27,6 +27,7 @@
#include "paddle/fluid//platform/device/gpu/gpu_types.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/ir/fuse_pass_base.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/naive_executor.h"
......@@ -37,6 +38,7 @@
#include "paddle/fluid/inference/analysis/helper.h"
#include "paddle/fluid/inference/analysis/passes/memory_optimize_pass.h"
#include "paddle/fluid/inference/api/helper.h"
#include "paddle/fluid/inference/api/infer_context.h"
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#include "paddle/fluid/inference/api/paddle_inference_pass.h"
#include "paddle/fluid/inference/utils/io_utils.h"
......@@ -198,6 +200,9 @@ bool AnalysisPredictor::Init(
if (!PrepareScope(parent_scope)) {
return false;
}
InitPlace();
if (!CreateExecutor()) {
return false;
}
......@@ -213,56 +218,32 @@ bool AnalysisPredictor::Init(
return true;
}
return true;
}
bool AnalysisPredictor::PrepareScope(
const std::shared_ptr<framework::Scope> &parent_scope) {
if (parent_scope) {
PADDLE_ENFORCE_NOT_NULL(
parent_scope,
platform::errors::PreconditionNotMet(
"Both program and parent_scope should be set in Clone mode."));
scope_ = parent_scope;
status_is_cloned_ = true;
} else {
paddle::framework::InitDevices();
paddle::framework::InitDefaultKernelSignatureMap();
// TODO(wilber): we need to release memory occupied by weights.
scope_.reset(new paddle::framework::Scope());
status_is_cloned_ = false;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
// TODO(inference): Now only gpu with external stream support private
// device_context.
if (config_.use_gpu_ && config_.use_external_stream_) {
private_context_ = true;
}
if (private_context_) {
if (!status_is_cloned_) {
predictor_stream_ = config_.GetExecStream();
}
// NOTE: If the external_stream equals to global_device_contexts's stream,
// then fallback.
auto global_stream =
static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
if (predictor_stream_ != global_stream) {
InitResourceManager(predictor_stream_);
InitDeviceContexts();
}
}
sub_scope_ = &scope_->NewScope();
#endif
return true;
}
bool AnalysisPredictor::PrepareProgram(
const std::shared_ptr<framework::ProgramDesc> &program) {
if (!program) {
if (!LoadProgramDesc()) return false;
// If not cloned, the parameters should be loaded.
// If config_.ir_optim() is True, parameters is loaded in
// OptimizeInferenceProgram(), but other persistable variables
// (like RAW type var) are not created in scope.
// If config_.ir_optim() is False, parameters is loaded in LoadParameters(),
// still need to create other persistable variables.
// So in both case, create persistable variables at first.
executor_->CreateVariables(*inference_program_, 0, true, sub_scope_);
// if enable_ir_optim_ is false,
// the analysis pass(op fuse, graph analysis, trt subgraph, mkldnn etc) will
// not be executed.
OptimizeInferenceProgram();
} else {
// If the program is passed from external, no need to optimize it, this
// logic is used in the clone scenario.
inference_program_ = program;
}
executor_->CreateVariables(*inference_program_, 0, false, sub_scope_);
return true;
}
bool AnalysisPredictor::CreateExecutor() {
void AnalysisPredictor::InitPlace() {
if (config_.use_gpu()) {
PADDLE_ENFORCE_EQ(config_.use_xpu(), false,
platform::errors::InvalidArgument(
......@@ -345,6 +326,160 @@ bool AnalysisPredictor::CreateExecutor() {
} else {
place_ = paddle::platform::CPUPlace();
}
}
void AnalysisPredictor::InitResourceManager(void *stream) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
predictor_stream_ =
ResourceManager::Instance().InitGPUResource(place_, stream);
#endif
}
void AnalysisPredictor::InitDeviceContexts() {
// Init GPUContext.
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (place_.GetType() == phi::AllocationType::GPU) {
device_contexts_.emplace(
place_, std::async(std::launch::deferred, [=] {
auto *gpu_resource =
ResourceManager::Instance().GetGPUResource(predictor_stream_);
auto *gpu_context = new InferGPUContext();
gpu_context->SetAllocator(
memory::allocation::AllocatorFacade::Instance()
.GetAllocator(place_, gpu_resource->GetStream())
.get());
gpu_context->SetPinnedAllocator(
memory::allocation::AllocatorFacade::Instance()
.GetAllocator(paddle::platform::CUDAPinnedPlace())
.get());
gpu_context->SetHostAllocator(
memory::allocation::AllocatorFacade::Instance()
.GetAllocator(platform::CPUPlace())
.get());
gpu_context->SetZeroAllocator(
memory::allocation::AllocatorFacade::Instance()
.GetZeroAllocator(place_)
.get());
gpu_context->SetGenerator(
framework::DefaultCUDAGenerator(place_.GetDeviceId()).get());
gpu_context->SetHostGenerator(framework::DefaultCPUGenerator().get());
gpu_context->SetStream(gpu_resource->GetStream());
gpu_context->SetBlasHandle(gpu_resource->GetBlasHandle());
gpu_context->SetBlasTensorCoreHandle(
gpu_resource->GetBlasTensorCoreHandle());
gpu_context->SetBlasTF32Handle(gpu_resource->GetBlasTF32Handle());
gpu_context->SetDnnHandle(gpu_resource->GetDnnHandle());
gpu_context->SetSolverHandle(gpu_resource->GetSolverDnHandle());
gpu_context->SetSparseHandle(gpu_resource->GetSparseHandle());
gpu_context->SetEigenDevice(gpu_resource->GetGpuEigenDevice());
gpu_context->SetComputeCapability(
gpu_resource->GetGpuComputeCapability());
gpu_context->SetMaxThreadsPerBlock(
gpu_resource->GetGpuMaxThreadsPerBlock());
gpu_context->SetMaxThreadsPerMultiProcessor(
gpu_resource->GetGpuMaxThreadsPerMp());
gpu_context->SetMaxGridDimSize(gpu_resource->GetGpuMaxGridDimSize());
gpu_context->SetMultiProcessors(
gpu_resource->GetGPUMultiProcessors());
gpu_context->SetDriverVersion(gpu_resource->GetGpuDriverVersion());
gpu_context->SetRuntimeVersion(gpu_resource->GetGpuRuntimeVersion());
VLOG(1) << "thread id is " << std::this_thread::get_id()
<< ", stream id is "
<< reinterpret_cast<void *>(gpu_resource->GetStream())
<< ", allotor ptr is "
<< reinterpret_cast<void *>(
memory::allocation::AllocatorFacade::Instance()
.GetAllocator(place_, gpu_resource->GetStream())
.get());
return std::unique_ptr<phi::DeviceContext>(gpu_context);
}));
}
#endif
// TODO(Inference): Support other backends.
}
void *AnalysisPredictor::GetExecStream() const {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (place_.GetType() == phi::AllocationType::GPU) {
if (private_context_) {
return predictor_stream_;
} else {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::GPUContext *>(pool.Get(place_))
->stream();
}
} else {
return nullptr;
}
return nullptr;
#else
// TODO(inference): Support other backends.
return nullptr;
#endif
}
const void *AnalysisPredictor::GetDeviceContexts() const {
if (private_context_) {
return &device_contexts_;
} else {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
const auto &dev_ctxs = pool.device_contexts();
return &dev_ctxs;
}
}
bool AnalysisPredictor::PrepareScope(
const std::shared_ptr<framework::Scope> &parent_scope) {
if (parent_scope) {
PADDLE_ENFORCE_NOT_NULL(
parent_scope,
platform::errors::PreconditionNotMet(
"Both program and parent_scope should be set in Clone mode."));
scope_ = parent_scope;
status_is_cloned_ = true;
} else {
paddle::framework::InitDevices();
paddle::framework::InitDefaultKernelSignatureMap();
// TODO(wilber): we need to release memory occupied by weights.
scope_.reset(new paddle::framework::Scope());
status_is_cloned_ = false;
}
sub_scope_ = &scope_->NewScope();
return true;
}
bool AnalysisPredictor::PrepareProgram(
const std::shared_ptr<framework::ProgramDesc> &program) {
if (!program) {
if (!LoadProgramDesc()) return false;
// If not cloned, the parameters should be loaded.
// If config_.ir_optim() is True, parameters is loaded in
// OptimizeInferenceProgram(), but other persistable variables
// (like RAW type var) are not created in scope.
// If config_.ir_optim() is False, parameters is loaded in LoadParameters(),
// still need to create other persistable variables.
// So in both case, create persistable variables at first.
executor_->CreateVariables(*inference_program_, 0, true, sub_scope_);
// if enable_ir_optim_ is false,
// the analysis pass(op fuse, graph analysis, trt subgraph, mkldnn etc) will
// not be executed.
OptimizeInferenceProgram();
} else {
// If the program is passed from external, no need to optimize it, this
// logic is used in the clone scenario.
inference_program_ = program;
}
executor_->CreateVariables(*inference_program_, 0, false, sub_scope_);
return true;
}
bool AnalysisPredictor::CreateExecutor() {
executor_.reset(new paddle::framework::NaiveExecutor(place_));
return true;
}
......@@ -1222,8 +1357,8 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetInputTensor(
platform::errors::PreconditionNotMet(
"The variable named %s is not found in the scope of the executor.",
name));
std::unique_ptr<ZeroCopyTensor> res(
new ZeroCopyTensor(static_cast<void *>(scope)));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(
static_cast<void *>(scope), this->GetDeviceContexts()));
res->input_or_output_ = true;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -1277,8 +1412,8 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetOutputTensor(
platform::errors::PreconditionNotMet(
"The variable named %s is not found in the scope of the executor.",
name));
std::unique_ptr<ZeroCopyTensor> res(
new ZeroCopyTensor(static_cast<void *>(scope)));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(
static_cast<void *>(scope), this->GetDeviceContexts()));
res->input_or_output_ = false;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -1327,6 +1462,9 @@ bool AnalysisPredictor::ZeroCopyRun() {
return true;
}
#endif
if (private_context_) {
paddle::platform::DeviceContextPool::SetDeviceContexts(&device_contexts_);
}
paddle::platform::SetNumThreads(config_.cpu_math_library_num_threads());
#ifdef PADDLE_WITH_MKLDNN
if (config_.use_mkldnn_) {
......@@ -1352,6 +1490,9 @@ bool AnalysisPredictor::ZeroCopyRun() {
// recover the cpu_math_library_num_threads to 1, in order to avoid thread
// conflict when integrating it into deployment service.
paddle::platform::SetNumThreads(1);
if (private_context_) {
paddle::platform::DeviceContextPool::SetDeviceContexts(nullptr);
}
#ifdef PADDLE_WITH_MKLDNN
if (config_.use_mkldnn_) MkldnnPostReset();
#endif
......@@ -1659,15 +1800,31 @@ AnalysisPredictor::~AnalysisPredictor() {
if (config_.shape_range_info_collected()) {
StatisticShapeRangeInfo();
}
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyGPUResource(predictor_stream_);
}
#endif
if (place_.GetType() != phi::AllocationType::UNDEFINED) {
memory::Release(place_);
}
device_contexts_.clear();
}
std::unique_ptr<PaddlePredictor> AnalysisPredictor::Clone() {
std::unique_ptr<PaddlePredictor> AnalysisPredictor::Clone(void *stream) {
std::lock_guard<std::mutex> lk(clone_mutex_);
auto *x = new AnalysisPredictor(config_);
x->status_is_cloned_ = true;
if (config_.use_external_stream_ && stream == nullptr) {
PADDLE_THROW(platform::errors::InvalidArgument(
"config has been configured to use external stream, but the Clone "
"function has not received a valid stream parameter."));
} else if (!config_.use_external_stream_ && stream != nullptr) {
PADDLE_THROW(platform::errors::InvalidArgument(
"config has not been configured to use external stream, but the Clone "
"function has received a stream parameter."));
}
x->predictor_stream_ = stream;
x->Init(scope_, inference_program_);
x->executor_->ResetTrtOps(++AnalysisPredictor::clone_num_);
return std::unique_ptr<PaddlePredictor>(x);
......@@ -1853,8 +2010,8 @@ std::unique_ptr<Tensor> Predictor::GetOutputHandle(const std::string &name) {
bool Predictor::Run() { return predictor_->ZeroCopyRun(); }
std::unique_ptr<Predictor> Predictor::Clone() {
auto analysis_pred = predictor_->Clone();
std::unique_ptr<Predictor> Predictor::Clone(void *stream) {
auto analysis_pred = predictor_->Clone(stream);
std::unique_ptr<Predictor> pred(new Predictor(std::move(analysis_pred)));
return pred;
}
......@@ -1865,6 +2022,8 @@ void Predictor::ClearIntermediateTensor() {
uint64_t Predictor::TryShrinkMemory() { return predictor_->TryShrinkMemory(); }
void *Predictor::GetExecStream() const { return predictor_->GetExecStream(); }
int GetNumBytesOfDataType(DataType dtype) {
switch (dtype) {
case DataType::FLOAT32:
......
......@@ -28,6 +28,7 @@
#include "paddle/fluid/inference/api/details/reset_tensor_array.h"
#include "paddle/fluid/inference/api/helper.h"
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#include "paddle/fluid/inference/api/resource_manager.h"
#include "paddle/fluid/platform/device/gpu/gpu_types.h"
#include "paddle/fluid/platform/float16.h"
#include "paddle/fluid/string/printf.h"
......@@ -184,6 +185,14 @@ class AnalysisPredictor : public PaddlePredictor {
bool ExpRunWithExternalStream(const gpuStream_t stream);
#endif
///
/// \brief Get the execution stream on devices with a concept of stream,
/// otherwise returns nullptr.
///
/// \return The execution stream or nullptr (CPU).
///
void *GetExecStream() const override;
///
/// \brief Create feed fetch variables
///
......@@ -235,7 +244,7 @@ class AnalysisPredictor : public PaddlePredictor {
///
/// \return get a new predictor
///
std::unique_ptr<PaddlePredictor> Clone() override;
std::unique_ptr<PaddlePredictor> Clone(void *stream = nullptr) override;
///
/// \brief Get the scope used by predictor
///
......@@ -393,10 +402,17 @@ class AnalysisPredictor : public PaddlePredictor {
FRIEND_TEST(AnalysisPredictor, with_gpu);
#endif
protected:
const void *GetDeviceContexts() const override;
private:
void StatisticShapeRangeInfo();
void CollectShapeRangeInfo();
void InitPlace();
void InitDeviceContexts();
void InitResourceManager(void *stream);
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// fleet exe related
......@@ -489,6 +505,11 @@ class AnalysisPredictor : public PaddlePredictor {
std::map<std::string, std::vector<std::vector<int32_t>>> shape_info_;
static int clone_num_;
bool private_context_{false};
void *predictor_stream_{nullptr};
std::map<phi::Place, std::shared_future<std::unique_ptr<phi::DeviceContext>>>
device_contexts_;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
// fleet executor related
distributed::FleetExecutorDesc executor_desc_;
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/inference/api/analysis_predictor.h"
#include "paddle/fluid/inference/api/resource_manager.h"
#if defined(PADDLE_WITH_CUDA)
#include <cuda_runtime.h>
#endif
......@@ -183,18 +184,11 @@ TEST(AnalysisPredictor, CollectShapeRangeInfo) {
w1->Reshape({4, 1});
w2->Reshape({4, 1});
w3->Reshape({4, 1});
auto* w0_data = w0->mutable_data<int64_t>(PaddlePlace::kCPU);
auto* w1_data = w1->mutable_data<int64_t>(PaddlePlace::kCPU);
auto* w2_data = w2->mutable_data<int64_t>(PaddlePlace::kCPU);
auto* w3_data = w3->mutable_data<int64_t>(PaddlePlace::kCPU);
for (int i = 0; i < 4; i++) {
w0_data[i] = i;
w1_data[i] = i;
w2_data[i] = i;
w3_data[i] = i;
}
std::vector<int64_t> input_data{0, 1, 2, 3};
w0->copy_from_cpu(input_data.data());
w1->copy_from_cpu(input_data.data());
w2->copy_from_cpu(input_data.data());
w3->copy_from_cpu(input_data.data());
predictor->ZeroCopyRun();
......@@ -539,6 +533,103 @@ TEST(Tensor, GpuShareExternalData) {
LOG(INFO) << "output size: " << size / sizeof(float);
predictor->TryShrinkMemory();
}
TEST(Predictor, Streams) {
// internal stream.
{
Config config;
config.SetModel(FLAGS_dirname);
config.EnableUseGpu(100, 0);
auto predictor = CreatePredictor(config);
gpuStream_t stream =
reinterpret_cast<gpuStream_t>(predictor->GetExecStream());
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 0);
}
// internal stream, create 2 predictor.
{
Config config1;
config1.SetModel(FLAGS_dirname);
config1.EnableUseGpu(100, 0);
auto predictor1 = CreatePredictor(config1);
gpuStream_t stream1 =
reinterpret_cast<gpuStream_t>(predictor1->GetExecStream());
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream1), 0);
Config config2;
config2.SetModel(FLAGS_dirname);
config2.EnableUseGpu(100, 0);
auto predictor2 = CreatePredictor(config2);
gpuStream_t stream2 =
reinterpret_cast<gpuStream_t>(predictor2->GetExecStream());
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 0);
CHECK_EQ(stream1, stream2);
}
// internal stream, clone
{
Config config;
config.SetModel(FLAGS_dirname);
config.EnableUseGpu(100, 0);
auto predictor = CreatePredictor(config);
gpuStream_t stream =
reinterpret_cast<gpuStream_t>(predictor->GetExecStream());
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 0);
auto predictor2 = predictor->Clone();
gpuStream_t stream2 =
reinterpret_cast<gpuStream_t>(predictor2->GetExecStream());
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 0);
CHECK_EQ(stream, stream2);
}
// external stream
{
cudaStream_t external_stream;
cudaStreamCreate(&external_stream);
Config config;
config.SetModel(FLAGS_dirname);
config.EnableUseGpu(100, 0);
config.SetExecStream(external_stream);
CHECK_EQ(config.external_stream_enabled(), true);
auto predictor = CreatePredictor(config);
gpuStream_t stream =
reinterpret_cast<gpuStream_t>(predictor->GetExecStream());
CHECK_EQ(external_stream, stream);
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1);
}
// 2 predictor on 2 stream
{
cudaStream_t external_stream;
cudaStreamCreate(&external_stream);
Config config;
config.SetModel(FLAGS_dirname);
config.EnableUseGpu(100, 0);
config.SetExecStream(external_stream);
auto predictor = CreatePredictor(config);
gpuStream_t stream =
reinterpret_cast<gpuStream_t>(predictor->GetExecStream());
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1);
cudaStream_t external_stream2;
cudaStreamCreate(&external_stream2);
Config config2;
config2.SetModel(FLAGS_dirname);
config2.EnableUseGpu(100, 0);
config2.SetExecStream(external_stream2);
auto predictor2 = CreatePredictor(config2);
gpuStream_t stream2 =
reinterpret_cast<gpuStream_t>(predictor2->GetExecStream());
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream2));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 1);
CHECK_NE(stream, stream2);
}
}
#endif
} // namespace paddle_infer
......@@ -181,7 +181,7 @@ bool NativePaddlePredictor::Run(const std::vector<PaddleTensor> &inputs,
return true;
}
std::unique_ptr<PaddlePredictor> NativePaddlePredictor::Clone() {
std::unique_ptr<PaddlePredictor> NativePaddlePredictor::Clone(void *stream) {
std::lock_guard<std::mutex> lk(clone_mutex_);
VLOG(3) << "Predictor::clone";
std::unique_ptr<PaddlePredictor> cls(new NativePaddlePredictor(config_));
......
......@@ -51,7 +51,7 @@ class NativePaddlePredictor : public PaddlePredictor {
std::vector<PaddleTensor> *output_data,
int batch_size = -1) override;
std::unique_ptr<PaddlePredictor> Clone() override;
std::unique_ptr<PaddlePredictor> Clone(void *stream = nullptr) override;
~NativePaddlePredictor() override;
......
......@@ -46,7 +46,9 @@ class DemoPredictor : public PaddlePredictor {
return false;
}
std::unique_ptr<PaddlePredictor> Clone() override { return nullptr; }
std::unique_ptr<PaddlePredictor> Clone(void *stream = nullptr) override {
return nullptr;
}
~DemoPredictor() override {}
};
......
......@@ -94,7 +94,17 @@ T *Tensor::mutable_data(PlaceType place) {
return tensor->mutable_data<T>(paddle::platform::CPUPlace());
}
case static_cast<int>(PlaceType::kGPU): {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
paddle::platform::CUDAPlace gpu_place(device_);
auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place, std::shared_future<std::unique_ptr<phi::DeviceContext>>>
*>(device_contexs_);
auto *dev_ctx =
static_cast<phi::GPUContext *>(dev_ctxs->at(gpu_place).get().get());
return dev_ctx->Alloc<T>(tensor, tensor->numel() * sizeof(T));
#else
return tensor->mutable_data<T>(paddle::platform::CUDAPlace(device_));
#endif
}
case static_cast<int>(PlaceType::kXPU): {
return tensor->mutable_data<T>(paddle::platform::XPUPlace(device_));
......@@ -181,12 +191,14 @@ void Tensor::CopyFromCpu(const T *data) {
std::memcpy(static_cast<void *>(t_data), data, ele_size);
} else if (place_ == PlaceType::kGPU) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
paddle::platform::CUDAPlace gpu_place(device_);
auto *t_data = tensor->mutable_data<T>(gpu_place);
auto *dev_ctx = static_cast<const paddle::platform::CUDADeviceContext *>(
pool.Get(gpu_place));
auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place, std::shared_future<std::unique_ptr<phi::DeviceContext>>> *>(
device_contexs_);
auto *dev_ctx =
static_cast<phi::GPUContext *>(dev_ctxs->at(gpu_place).get().get());
auto *t_data = dev_ctx->Alloc<T>(tensor, tensor->numel() * sizeof(T));
paddle::memory::Copy(gpu_place, static_cast<void *>(t_data),
paddle::platform::CPUPlace(), data, ele_size,
......@@ -359,11 +371,12 @@ void Tensor::CopyToCpuImpl(T *data, void *exec_stream, CallbackFunc cb,
#endif
} else if (place_ == PlaceType::kGPU) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
auto gpu_place = t_place;
auto *dev_ctx = static_cast<const paddle::platform::CUDADeviceContext *>(
pool.Get(gpu_place));
auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place, std::shared_future<std::unique_ptr<phi::DeviceContext>>> *>(
device_contexs_);
auto *dev_ctx =
static_cast<phi::GPUContext *>(dev_ctxs->at(gpu_place).get().get());
paddle::memory::Copy(paddle::platform::CPUPlace(),
static_cast<void *>(data), gpu_place, t_data,
ele_num * sizeof(T), dev_ctx->stream());
......@@ -547,7 +560,8 @@ template PD_INFER_DECL uint8_t *Tensor::mutable_data<uint8_t>(PlaceType place);
template PD_INFER_DECL int8_t *Tensor::mutable_data<int8_t>(PlaceType place);
template PD_INFER_DECL float16 *Tensor::mutable_data<float16>(PlaceType place);
Tensor::Tensor(void *scope) : scope_{scope} {}
Tensor::Tensor(void *scope, const void *device_contexts)
: scope_{scope}, device_contexs_(device_contexts) {}
template <typename T>
void *Tensor::FindTensor() const {
......
......@@ -25,14 +25,19 @@
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/inference/api/helper.h"
#include "paddle/fluid/inference/api/paddle_tensor.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
namespace paddle_infer {
struct TensorWrapper : public Tensor {
TensorWrapper(paddle_infer::PlaceType place, paddle::framework::Scope* scope,
const std::string& name)
: Tensor{static_cast<void*>(scope)} {
TensorWrapper(
paddle_infer::PlaceType place, paddle::framework::Scope* scope,
const std::map<phi::Place,
std::shared_future<std::unique_ptr<phi::DeviceContext>>>*
dev_ctxs,
const std::string& name)
: Tensor{static_cast<void*>(scope), dev_ctxs} {
SetPlace(place, 0 /*device_id*/);
SetName(name);
input_or_output_ = true;
......@@ -42,7 +47,11 @@ struct TensorWrapper : public Tensor {
std::unique_ptr<Tensor> CreateTensor(paddle_infer::PlaceType place,
paddle::framework::Scope* scope,
const std::string& name) {
return std::unique_ptr<Tensor>(new TensorWrapper{place, scope, name});
paddle::platform::DeviceContextPool& pool =
paddle::platform::DeviceContextPool::Instance();
const auto& dev_ctxs = pool.device_contexts();
return std::unique_ptr<Tensor>(
new TensorWrapper{place, scope, &dev_ctxs, name});
}
template <typename T>
......
......@@ -243,7 +243,7 @@ std::unique_ptr<ZeroCopyTensor> ONNXRuntimePredictor::GetInputTensor(
"The in variable named %s is not found in the "
"ONNXPredictor.",
name));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(nullptr));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(nullptr, this));
res->input_or_output_ = true;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -264,7 +264,7 @@ std::unique_ptr<ZeroCopyTensor> ONNXRuntimePredictor::GetOutputTensor(
"The out variable named %s is not found in the "
"ONNXPredictor.",
name));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(nullptr));
std::unique_ptr<ZeroCopyTensor> res(new ZeroCopyTensor(nullptr, this));
res->input_or_output_ = false;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -309,7 +309,7 @@ bool ONNXRuntimePredictor::ZeroCopyRun() {
return true;
}
std::unique_ptr<PaddlePredictor> ONNXRuntimePredictor::Clone() {
std::unique_ptr<PaddlePredictor> ONNXRuntimePredictor::Clone(void *stream) {
LOG(ERROR) << "Not support Clone(), Please create new Predictor";
return nullptr;
}
......@@ -325,4 +325,14 @@ ONNXRuntimePredictor::~ONNXRuntimePredictor() {
memory::Release(place_);
}
const void *ONNXRuntimePredictor::GetDeviceContexts() const {
// TODO(inference): Support private device contexts.
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
const auto &dev_ctxs = pool.device_contexts();
return &const_cast<std::map<
phi::Place, std::shared_future<std::unique_ptr<phi::DeviceContext>>> &>(
dev_ctxs);
}
} // namespace paddle
......@@ -174,7 +174,10 @@ class ONNXRuntimePredictor : public PaddlePredictor {
///
/// \return get a new predictor
///
std::unique_ptr<PaddlePredictor> Clone() override;
std::unique_ptr<PaddlePredictor> Clone(void *stream = nullptr) override;
protected:
const void *GetDeviceContexts() const override;
private:
///
......
......@@ -585,6 +585,25 @@ struct PD_INFER_DECL AnalysisConfig {
///
bool trt_allow_build_at_runtime();
///
/// \brief Set execution stream. If not set a stream will be created
/// internally.
///
void SetExecStream(void* stream);
///
/// \brief Get execution stream. The user needs to explicitly cast into a
/// stream type such as cudaStream_t, hipStream_t, etc.
///
void* GetExecStream() const;
///
/// \brief Whether the external stream is used, if True, the predictor clone
/// operation must use the external stream, otherwise the framework manages
/// the stream internally.
///
bool external_stream_enabled() const;
///
/// \brief Collect shape info of all tensors in compute graph.
///
......@@ -926,6 +945,8 @@ struct PD_INFER_DECL AnalysisConfig {
"matrix_nms"};
bool use_cudnn_{false};
bool use_external_stream_{false};
void* exec_stream_{nullptr};
// NPU related
bool use_npu_{false};
......
......@@ -195,7 +195,8 @@ class PD_INFER_DECL ZeroCopyTensor : public paddle_infer::Tensor {
private:
friend class AnalysisPredictor;
friend class ONNXRuntimePredictor;
explicit ZeroCopyTensor(void* scope) : paddle_infer::Tensor{scope} {}
explicit ZeroCopyTensor(void* scope, const void* device_contexts)
: paddle_infer::Tensor{scope, device_contexts} {}
};
/// \brief A Predictor for executing inference on a model.
......@@ -286,7 +287,7 @@ class PD_INFER_DECL PaddlePredictor {
/// When using clone, the same network will be created,
/// and the parameters between them are shared.
/// \return unique_ptr which contains the pointer of predictor
virtual std::unique_ptr<PaddlePredictor> Clone() = 0;
virtual std::unique_ptr<PaddlePredictor> Clone(void* stream = nullptr) = 0;
/// \brief Destroy the Predictor.
virtual ~PaddlePredictor() = default;
......@@ -300,6 +301,11 @@ class PD_INFER_DECL PaddlePredictor {
struct Config {
std::string model_dir; /*!< path to the model directory. */
};
virtual void* GetExecStream() const { return nullptr; }
protected:
virtual const void* GetDeviceContexts() const { return nullptr; }
};
///
......
......@@ -133,7 +133,7 @@ class PD_INFER_DECL Predictor {
///
/// \return get a new predictor
///
std::unique_ptr<Predictor> Clone();
std::unique_ptr<Predictor> Clone(void* stream = nullptr);
/// \brief Clear the intermediate tensors of the predictor
void ClearIntermediateTensor();
......@@ -149,6 +149,14 @@ class PD_INFER_DECL Predictor {
///
uint64_t TryShrinkMemory();
///
/// \brief Get the execution stream on devices with a concept of stream,
/// otherwise returns nullptr.
///
/// \return The execution stream or nullptr (CPU).
///
void* GetExecStream() const;
private:
std::unique_ptr<paddle::PaddlePredictor> predictor_;
friend class paddle_infer::experimental::InternalUtils;
......
......@@ -162,7 +162,7 @@ class PD_INFER_DECL Tensor {
PlaceType place() const;
protected:
explicit Tensor(void* scope);
explicit Tensor(void* scope, const void* device_contexs);
template <typename T>
void* FindTensor() const;
......@@ -181,6 +181,7 @@ class PD_INFER_DECL Tensor {
DataType dtype_;
bool input_or_output_;
void* scope_{nullptr};
const void* device_contexs_{nullptr};
PlaceType place_;
int device_;
......
......@@ -32,7 +32,10 @@ class InferApiTesterUtils {
const std::string &name, PlaceType place, void *p_scope) {
auto var = static_cast<paddle::framework::Scope *>(p_scope)->Var(name);
var->GetMutable<paddle::framework::LoDTensor>();
std::unique_ptr<Tensor> res(new Tensor(p_scope));
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
const auto &dev_ctxs = pool.device_contexts();
std::unique_ptr<Tensor> res(new Tensor(p_scope, &dev_ctxs));
res->input_or_output_ = true;
res->SetName(name);
res->SetPlace(place, 0 /*device id*/);
......
......@@ -13,6 +13,9 @@
// limitations under the License.
#include "test_suite.h" // NOLINT
#ifdef PADDLE_WITH_GPU
#include <cuda_runtime.h>
#endif
DEFINE_string(modeldir, "", "Directory of the inference model.");
......@@ -170,6 +173,70 @@ TEST(tensorrt_tester_LeViT, multi_thread4_trt_fp32_bz2) {
std::cout << "finish multi-thread test" << std::endl;
}
#ifdef PADDLE_WITH_GPU
TEST(tensorrt_tester_LeViT, multi_stream_thread4_trt_fp32_bz2) {
int thread_num = 4;
// init stream
std::vector<cudaStream_t> streams(thread_num);
for (size_t i = 0; i < thread_num; ++i) {
cudaStreamCreate(&streams[i]);
}
// init input data
std::map<std::string, paddle::test::Record> my_input_data_map;
my_input_data_map["x"] = PrepareInput(2);
// init output data
std::map<std::string, paddle::test::Record> infer_output_data,
truth_output_data;
// prepare groudtruth config
paddle_infer::Config config, config_no_ir;
config_no_ir.SetModel(FLAGS_modeldir + "/inference.pdmodel",
FLAGS_modeldir + "/inference.pdiparams");
config_no_ir.SwitchIrOptim(false);
// prepare inference config
config.SetModel(FLAGS_modeldir + "/inference.pdmodel",
FLAGS_modeldir + "/inference.pdiparams");
config.EnableUseGpu(100, 0);
config.EnableTensorRtEngine(
1 << 20, 2, 50, paddle_infer::PrecisionType::kFloat32, false, false);
// get groudtruth by disbale ir
paddle_infer::services::PredictorPool pred_pool_no_ir(config_no_ir, 1);
SingleThreadPrediction(pred_pool_no_ir.Retrive(0), &my_input_data_map,
&truth_output_data, 1);
// get infer results from multi threads
std::vector<std::thread> threads;
config.SetExecStream(streams[0]);
config.pass_builder()->DeletePass("add_support_int8_pass");
auto main_predictor = CreatePredictor(config);
std::vector<decltype(main_predictor)> predictors;
for (size_t i = 0; i < thread_num - 1; ++i) {
predictors.push_back(std::move(main_predictor->Clone(streams[i + 1])));
LOG(INFO) << "predictors[" << i << "] stream is "
<< predictors[i]->GetExecStream();
}
predictors.push_back(std::move(main_predictor));
LOG(INFO) << "predictors[" << thread_num - 1 << "] stream is "
<< predictors[thread_num - 1]->GetExecStream();
for (int i = 0; i < thread_num; ++i) {
threads.emplace_back(paddle::test::SingleThreadPrediction,
predictors[i].get(), &my_input_data_map,
&infer_output_data, 10);
}
// thread join & check outputs
for (int i = 0; i < thread_num; ++i) {
LOG(INFO) << "join tid : " << i;
threads[i].join();
// CompareRecord(&truth_output_data, &infer_output_data);
}
std::cout << "finish multi-thread test" << std::endl;
}
#endif
} // namespace paddle_infer
int main(int argc, char** argv) {
......
......@@ -129,11 +129,22 @@ DeviceType Place2DeviceType(const platform::Place& place) {
}
DeviceContextPool* DeviceContextPool::pool = nullptr;
thread_local const std::map<Place,
std::shared_future<std::unique_ptr<DeviceContext>>>*
DeviceContextPool::external_device_contexts_ = nullptr;
platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) {
VLOG(6) << "DeviceContextPool Get: " << place;
auto it = device_contexts_.find(place);
if (it == device_contexts_.end()) {
const std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>*
ptr;
if (external_device_contexts_ && external_device_contexts_->count(place)) {
ptr = external_device_contexts_;
} else {
ptr = &device_contexts_;
}
auto it = ptr->find(place);
if (it == ptr->end()) {
PADDLE_THROW(platform::errors::Unimplemented(
"Place %s is not supported. Please check that your paddle compiles "
"with WITH_GPU, WITH_XPU, WITH_IPU, WITH_MLU or WITH_ASCEND_CL option "
......@@ -145,6 +156,27 @@ platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) {
return it->second.get().get();
}
size_t DeviceContextPool::size() const {
if (external_device_contexts_) {
return external_device_contexts_->size();
}
return device_contexts_.size();
}
const std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>&
DeviceContextPool::device_contexts() const {
if (external_device_contexts_) {
return *external_device_contexts_;
}
return device_contexts_;
}
void DeviceContextPool::SetDeviceContexts(
const std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>*
dev_ctxs) {
external_device_contexts_ = dev_ctxs;
}
template <typename DevCtx>
inline void EmplaceDeviceContext(
std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>*
......
......@@ -57,7 +57,7 @@ limitations under the License. */
#endif
#ifdef PADDLE_WITH_MKLDNN
#include "dnnl.hpp"
#include "dnnl.hpp" // NOLINT
#include "paddle/fluid/framework/data_layout.h"
#endif
......@@ -915,17 +915,22 @@ class DeviceContextPool {
const typename DefaultDeviceContextType<Place>::TYPE*>(Get(place));
}
size_t size() const { return device_contexts_.size(); }
size_t size() const;
const std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>&
device_contexts() const {
return device_contexts_;
}
device_contexts() const;
static void SetDeviceContexts(
const std::map<Place,
std::shared_future<std::unique_ptr<DeviceContext>>>*);
private:
static DeviceContextPool* pool;
std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>
device_contexts_;
static thread_local const std::map<
Place, std::shared_future<std::unique_ptr<DeviceContext>>>*
external_device_contexts_; // not owned
DISABLE_COPY_AND_ASSIGN(DeviceContextPool);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册