From cab0f2f503b03ba6497b25fbc4598249417457d1 Mon Sep 17 00:00:00 2001 From: Wilber Date: Wed, 8 Jun 2022 21:16:18 +0800 Subject: [PATCH] thread_local method to support predictor stream. (#42785) --- paddle/fluid/inference/api/analysis_config.cc | 24 ++ .../fluid/inference/api/analysis_predictor.cc | 267 ++++++++++++++---- .../fluid/inference/api/analysis_predictor.h | 23 +- .../api/analysis_predictor_tester.cc | 115 +++++++- paddle/fluid/inference/api/api_impl.cc | 2 +- paddle/fluid/inference/api/api_impl.h | 2 +- paddle/fluid/inference/api/api_tester.cc | 4 +- .../inference/api/details/zero_copy_tensor.cc | 34 ++- .../api/details/zero_copy_tensor_test.cc | 17 +- .../inference/api/onnxruntime_predictor.cc | 16 +- .../inference/api/onnxruntime_predictor.h | 5 +- .../inference/api/paddle_analysis_config.h | 21 ++ paddle/fluid/inference/api/paddle_api.h | 10 +- .../inference/api/paddle_inference_api.h | 10 +- paddle/fluid/inference/api/paddle_tensor.h | 3 +- .../paddle_infer_api_copy_tensor_tester.cc | 5 +- .../inference/tests/infer_ut/test_LeViT.cc | 67 +++++ paddle/fluid/platform/device_context.cc | 36 ++- paddle/fluid/platform/device_context.h | 15 +- 19 files changed, 576 insertions(+), 100 deletions(-) diff --git a/paddle/fluid/inference/api/analysis_config.cc b/paddle/fluid/inference/api/analysis_config.cc index c23397a0828..9fdc7a93cc2 100644 --- a/paddle/fluid/inference/api/analysis_config.cc +++ b/paddle/fluid/inference/api/analysis_config.cc @@ -100,6 +100,24 @@ void AnalysisConfig::EnableUseGpu(uint64_t memory_pool_init_size_mb, Update(); } +void AnalysisConfig::SetExecStream(void *stream) { + PADDLE_ENFORCE_NOT_NULL(stream, platform::errors::InvalidArgument( + "`stream` should not be nullptr")); + exec_stream_ = stream; + use_external_stream_ = true; + Update(); +} + +void *AnalysisConfig::GetExecStream() const { + PADDLE_ENFORCE_NOT_NULL(exec_stream_, platform::errors::InvalidArgument( + "`stream` should not be nullptr")); + return exec_stream_; +} + +bool AnalysisConfig::external_stream_enabled() const { + return use_external_stream_; +} + void AnalysisConfig::DisableGpu() { use_gpu_ = false; @@ -239,6 +257,8 @@ AnalysisConfig::AnalysisConfig(const AnalysisConfig &other) { CP_MEMBER(use_fc_padding_); // GPU related. CP_MEMBER(use_gpu_); + CP_MEMBER(use_external_stream_); + CP_MEMBER(exec_stream_); CP_MEMBER(use_cudnn_); CP_MEMBER(gpu_device_id_); CP_MEMBER(memory_pool_init_size_mb_); @@ -787,6 +807,8 @@ std::string AnalysisConfig::SerializeInfoCache() { ss << params_file_; ss << use_gpu_; + ss << use_external_stream_; + ss << exec_stream_; ss << use_gpu_fp16_; for (auto &item : gpu_fp16_disabled_op_types_) ss << item; ss << use_fc_padding_; @@ -985,6 +1007,8 @@ std::string AnalysisConfig::Summary() { os.InsertRow({"gpu_device_id", std::to_string(gpu_device_id_)}); os.InsertRow({"memory_pool_init_size", std::to_string(memory_pool_init_size_mb_) + "MB"}); + os.InsertRow( + {"use_external_stream", use_external_stream_ ? "true" : "false"}); os.InsertRow( {"thread_local_stream", thread_local_stream_ ? "true" : "false"}); diff --git a/paddle/fluid/inference/api/analysis_predictor.cc b/paddle/fluid/inference/api/analysis_predictor.cc index 18229c302db..7f30b80224e 100644 --- a/paddle/fluid/inference/api/analysis_predictor.cc +++ b/paddle/fluid/inference/api/analysis_predictor.cc @@ -27,6 +27,7 @@ #include "paddle/fluid//platform/device/gpu/gpu_types.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" +#include "paddle/fluid/framework/generator.h" #include "paddle/fluid/framework/ir/fuse_pass_base.h" #include "paddle/fluid/framework/ir/pass.h" #include "paddle/fluid/framework/naive_executor.h" @@ -37,6 +38,7 @@ #include "paddle/fluid/inference/analysis/helper.h" #include "paddle/fluid/inference/analysis/passes/memory_optimize_pass.h" #include "paddle/fluid/inference/api/helper.h" +#include "paddle/fluid/inference/api/infer_context.h" #include "paddle/fluid/inference/api/paddle_inference_api.h" #include "paddle/fluid/inference/api/paddle_inference_pass.h" #include "paddle/fluid/inference/utils/io_utils.h" @@ -198,6 +200,9 @@ bool AnalysisPredictor::Init( if (!PrepareScope(parent_scope)) { return false; } + + InitPlace(); + if (!CreateExecutor()) { return false; } @@ -213,56 +218,32 @@ bool AnalysisPredictor::Init( return true; } - return true; -} - -bool AnalysisPredictor::PrepareScope( - const std::shared_ptr &parent_scope) { - if (parent_scope) { - PADDLE_ENFORCE_NOT_NULL( - parent_scope, - platform::errors::PreconditionNotMet( - "Both program and parent_scope should be set in Clone mode.")); - scope_ = parent_scope; - status_is_cloned_ = true; - } else { - paddle::framework::InitDevices(); - paddle::framework::InitDefaultKernelSignatureMap(); - // TODO(wilber): we need to release memory occupied by weights. - scope_.reset(new paddle::framework::Scope()); - status_is_cloned_ = false; +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + // TODO(inference): Now only gpu with external stream support private + // device_context. + if (config_.use_gpu_ && config_.use_external_stream_) { + private_context_ = true; + } + if (private_context_) { + if (!status_is_cloned_) { + predictor_stream_ = config_.GetExecStream(); + } + // NOTE: If the external_stream equals to global_device_contexts's stream, + // then fallback. + auto global_stream = + static_cast( + platform::DeviceContextPool::Instance().Get(place_)) + ->stream(); + if (predictor_stream_ != global_stream) { + InitResourceManager(predictor_stream_); + InitDeviceContexts(); + } } - sub_scope_ = &scope_->NewScope(); +#endif return true; } -bool AnalysisPredictor::PrepareProgram( - const std::shared_ptr &program) { - if (!program) { - if (!LoadProgramDesc()) return false; - // If not cloned, the parameters should be loaded. - // If config_.ir_optim() is True, parameters is loaded in - // OptimizeInferenceProgram(), but other persistable variables - // (like RAW type var) are not created in scope. - // If config_.ir_optim() is False, parameters is loaded in LoadParameters(), - // still need to create other persistable variables. - // So in both case, create persistable variables at first. - executor_->CreateVariables(*inference_program_, 0, true, sub_scope_); - // if enable_ir_optim_ is false, - // the analysis pass(op fuse, graph analysis, trt subgraph, mkldnn etc) will - // not be executed. - OptimizeInferenceProgram(); - } else { - // If the program is passed from external, no need to optimize it, this - // logic is used in the clone scenario. - inference_program_ = program; - } - - executor_->CreateVariables(*inference_program_, 0, false, sub_scope_); - - return true; -} -bool AnalysisPredictor::CreateExecutor() { +void AnalysisPredictor::InitPlace() { if (config_.use_gpu()) { PADDLE_ENFORCE_EQ(config_.use_xpu(), false, platform::errors::InvalidArgument( @@ -345,6 +326,160 @@ bool AnalysisPredictor::CreateExecutor() { } else { place_ = paddle::platform::CPUPlace(); } +} + +void AnalysisPredictor::InitResourceManager(void *stream) { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + predictor_stream_ = + ResourceManager::Instance().InitGPUResource(place_, stream); +#endif +} + +void AnalysisPredictor::InitDeviceContexts() { +// Init GPUContext. +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + if (place_.GetType() == phi::AllocationType::GPU) { + device_contexts_.emplace( + place_, std::async(std::launch::deferred, [=] { + auto *gpu_resource = + ResourceManager::Instance().GetGPUResource(predictor_stream_); + auto *gpu_context = new InferGPUContext(); + gpu_context->SetAllocator( + memory::allocation::AllocatorFacade::Instance() + .GetAllocator(place_, gpu_resource->GetStream()) + .get()); + gpu_context->SetPinnedAllocator( + memory::allocation::AllocatorFacade::Instance() + .GetAllocator(paddle::platform::CUDAPinnedPlace()) + .get()); + gpu_context->SetHostAllocator( + memory::allocation::AllocatorFacade::Instance() + .GetAllocator(platform::CPUPlace()) + .get()); + gpu_context->SetZeroAllocator( + memory::allocation::AllocatorFacade::Instance() + .GetZeroAllocator(place_) + .get()); + gpu_context->SetGenerator( + framework::DefaultCUDAGenerator(place_.GetDeviceId()).get()); + gpu_context->SetHostGenerator(framework::DefaultCPUGenerator().get()); + + gpu_context->SetStream(gpu_resource->GetStream()); + gpu_context->SetBlasHandle(gpu_resource->GetBlasHandle()); + gpu_context->SetBlasTensorCoreHandle( + gpu_resource->GetBlasTensorCoreHandle()); + gpu_context->SetBlasTF32Handle(gpu_resource->GetBlasTF32Handle()); + gpu_context->SetDnnHandle(gpu_resource->GetDnnHandle()); + gpu_context->SetSolverHandle(gpu_resource->GetSolverDnHandle()); + gpu_context->SetSparseHandle(gpu_resource->GetSparseHandle()); + gpu_context->SetEigenDevice(gpu_resource->GetGpuEigenDevice()); + gpu_context->SetComputeCapability( + gpu_resource->GetGpuComputeCapability()); + gpu_context->SetMaxThreadsPerBlock( + gpu_resource->GetGpuMaxThreadsPerBlock()); + gpu_context->SetMaxThreadsPerMultiProcessor( + gpu_resource->GetGpuMaxThreadsPerMp()); + gpu_context->SetMaxGridDimSize(gpu_resource->GetGpuMaxGridDimSize()); + gpu_context->SetMultiProcessors( + gpu_resource->GetGPUMultiProcessors()); + gpu_context->SetDriverVersion(gpu_resource->GetGpuDriverVersion()); + gpu_context->SetRuntimeVersion(gpu_resource->GetGpuRuntimeVersion()); + VLOG(1) << "thread id is " << std::this_thread::get_id() + << ", stream id is " + << reinterpret_cast(gpu_resource->GetStream()) + << ", allotor ptr is " + << reinterpret_cast( + memory::allocation::AllocatorFacade::Instance() + .GetAllocator(place_, gpu_resource->GetStream()) + .get()); + return std::unique_ptr(gpu_context); + })); + } +#endif + // TODO(Inference): Support other backends. +} + +void *AnalysisPredictor::GetExecStream() const { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + if (place_.GetType() == phi::AllocationType::GPU) { + if (private_context_) { + return predictor_stream_; + } else { + paddle::platform::DeviceContextPool &pool = + paddle::platform::DeviceContextPool::Instance(); + return reinterpret_cast(pool.Get(place_)) + ->stream(); + } + } else { + return nullptr; + } + return nullptr; +#else + // TODO(inference): Support other backends. + return nullptr; +#endif +} + +const void *AnalysisPredictor::GetDeviceContexts() const { + if (private_context_) { + return &device_contexts_; + } else { + paddle::platform::DeviceContextPool &pool = + paddle::platform::DeviceContextPool::Instance(); + const auto &dev_ctxs = pool.device_contexts(); + return &dev_ctxs; + } +} + +bool AnalysisPredictor::PrepareScope( + const std::shared_ptr &parent_scope) { + if (parent_scope) { + PADDLE_ENFORCE_NOT_NULL( + parent_scope, + platform::errors::PreconditionNotMet( + "Both program and parent_scope should be set in Clone mode.")); + scope_ = parent_scope; + status_is_cloned_ = true; + } else { + paddle::framework::InitDevices(); + paddle::framework::InitDefaultKernelSignatureMap(); + // TODO(wilber): we need to release memory occupied by weights. + scope_.reset(new paddle::framework::Scope()); + status_is_cloned_ = false; + } + sub_scope_ = &scope_->NewScope(); + return true; +} + +bool AnalysisPredictor::PrepareProgram( + const std::shared_ptr &program) { + if (!program) { + if (!LoadProgramDesc()) return false; + // If not cloned, the parameters should be loaded. + // If config_.ir_optim() is True, parameters is loaded in + // OptimizeInferenceProgram(), but other persistable variables + // (like RAW type var) are not created in scope. + // If config_.ir_optim() is False, parameters is loaded in LoadParameters(), + // still need to create other persistable variables. + // So in both case, create persistable variables at first. + executor_->CreateVariables(*inference_program_, 0, true, sub_scope_); + + // if enable_ir_optim_ is false, + // the analysis pass(op fuse, graph analysis, trt subgraph, mkldnn etc) will + // not be executed. + OptimizeInferenceProgram(); + } else { + // If the program is passed from external, no need to optimize it, this + // logic is used in the clone scenario. + inference_program_ = program; + } + + executor_->CreateVariables(*inference_program_, 0, false, sub_scope_); + + return true; +} + +bool AnalysisPredictor::CreateExecutor() { executor_.reset(new paddle::framework::NaiveExecutor(place_)); return true; } @@ -1222,8 +1357,8 @@ std::unique_ptr AnalysisPredictor::GetInputTensor( platform::errors::PreconditionNotMet( "The variable named %s is not found in the scope of the executor.", name)); - std::unique_ptr res( - new ZeroCopyTensor(static_cast(scope))); + std::unique_ptr res(new ZeroCopyTensor( + static_cast(scope), this->GetDeviceContexts())); res->input_or_output_ = true; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -1277,8 +1412,8 @@ std::unique_ptr AnalysisPredictor::GetOutputTensor( platform::errors::PreconditionNotMet( "The variable named %s is not found in the scope of the executor.", name)); - std::unique_ptr res( - new ZeroCopyTensor(static_cast(scope))); + std::unique_ptr res(new ZeroCopyTensor( + static_cast(scope), this->GetDeviceContexts())); res->input_or_output_ = false; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -1327,6 +1462,9 @@ bool AnalysisPredictor::ZeroCopyRun() { return true; } #endif + if (private_context_) { + paddle::platform::DeviceContextPool::SetDeviceContexts(&device_contexts_); + } paddle::platform::SetNumThreads(config_.cpu_math_library_num_threads()); #ifdef PADDLE_WITH_MKLDNN if (config_.use_mkldnn_) { @@ -1352,6 +1490,9 @@ bool AnalysisPredictor::ZeroCopyRun() { // recover the cpu_math_library_num_threads to 1, in order to avoid thread // conflict when integrating it into deployment service. paddle::platform::SetNumThreads(1); + if (private_context_) { + paddle::platform::DeviceContextPool::SetDeviceContexts(nullptr); + } #ifdef PADDLE_WITH_MKLDNN if (config_.use_mkldnn_) MkldnnPostReset(); #endif @@ -1659,15 +1800,31 @@ AnalysisPredictor::~AnalysisPredictor() { if (config_.shape_range_info_collected()) { StatisticShapeRangeInfo(); } - +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + if (predictor_stream_ != nullptr) { + ResourceManager::Instance().DestroyGPUResource(predictor_stream_); + } +#endif if (place_.GetType() != phi::AllocationType::UNDEFINED) { memory::Release(place_); } + device_contexts_.clear(); } -std::unique_ptr AnalysisPredictor::Clone() { +std::unique_ptr AnalysisPredictor::Clone(void *stream) { std::lock_guard lk(clone_mutex_); auto *x = new AnalysisPredictor(config_); + x->status_is_cloned_ = true; + if (config_.use_external_stream_ && stream == nullptr) { + PADDLE_THROW(platform::errors::InvalidArgument( + "config has been configured to use external stream, but the Clone " + "function has not received a valid stream parameter.")); + } else if (!config_.use_external_stream_ && stream != nullptr) { + PADDLE_THROW(platform::errors::InvalidArgument( + "config has not been configured to use external stream, but the Clone " + "function has received a stream parameter.")); + } + x->predictor_stream_ = stream; x->Init(scope_, inference_program_); x->executor_->ResetTrtOps(++AnalysisPredictor::clone_num_); return std::unique_ptr(x); @@ -1853,8 +2010,8 @@ std::unique_ptr Predictor::GetOutputHandle(const std::string &name) { bool Predictor::Run() { return predictor_->ZeroCopyRun(); } -std::unique_ptr Predictor::Clone() { - auto analysis_pred = predictor_->Clone(); +std::unique_ptr Predictor::Clone(void *stream) { + auto analysis_pred = predictor_->Clone(stream); std::unique_ptr pred(new Predictor(std::move(analysis_pred))); return pred; } @@ -1865,6 +2022,8 @@ void Predictor::ClearIntermediateTensor() { uint64_t Predictor::TryShrinkMemory() { return predictor_->TryShrinkMemory(); } +void *Predictor::GetExecStream() const { return predictor_->GetExecStream(); } + int GetNumBytesOfDataType(DataType dtype) { switch (dtype) { case DataType::FLOAT32: diff --git a/paddle/fluid/inference/api/analysis_predictor.h b/paddle/fluid/inference/api/analysis_predictor.h index 1cfdaf1a558..ff17353f303 100644 --- a/paddle/fluid/inference/api/analysis_predictor.h +++ b/paddle/fluid/inference/api/analysis_predictor.h @@ -28,6 +28,7 @@ #include "paddle/fluid/inference/api/details/reset_tensor_array.h" #include "paddle/fluid/inference/api/helper.h" #include "paddle/fluid/inference/api/paddle_inference_api.h" +#include "paddle/fluid/inference/api/resource_manager.h" #include "paddle/fluid/platform/device/gpu/gpu_types.h" #include "paddle/fluid/platform/float16.h" #include "paddle/fluid/string/printf.h" @@ -184,6 +185,14 @@ class AnalysisPredictor : public PaddlePredictor { bool ExpRunWithExternalStream(const gpuStream_t stream); #endif + /// + /// \brief Get the execution stream on devices with a concept of stream, + /// otherwise returns nullptr. + /// + /// \return The execution stream or nullptr (CPU). + /// + void *GetExecStream() const override; + /// /// \brief Create feed fetch variables /// @@ -235,7 +244,7 @@ class AnalysisPredictor : public PaddlePredictor { /// /// \return get a new predictor /// - std::unique_ptr Clone() override; + std::unique_ptr Clone(void *stream = nullptr) override; /// /// \brief Get the scope used by predictor /// @@ -393,10 +402,17 @@ class AnalysisPredictor : public PaddlePredictor { FRIEND_TEST(AnalysisPredictor, with_gpu); #endif + protected: + const void *GetDeviceContexts() const override; + private: void StatisticShapeRangeInfo(); void CollectShapeRangeInfo(); + void InitPlace(); + void InitDeviceContexts(); + void InitResourceManager(void *stream); + #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // fleet exe related @@ -489,6 +505,11 @@ class AnalysisPredictor : public PaddlePredictor { std::map>> shape_info_; static int clone_num_; + bool private_context_{false}; + void *predictor_stream_{nullptr}; + std::map>> + device_contexts_; + #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) // fleet executor related distributed::FleetExecutorDesc executor_desc_; diff --git a/paddle/fluid/inference/api/analysis_predictor_tester.cc b/paddle/fluid/inference/api/analysis_predictor_tester.cc index f16054565a7..1e45c245342 100644 --- a/paddle/fluid/inference/api/analysis_predictor_tester.cc +++ b/paddle/fluid/inference/api/analysis_predictor_tester.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/inference/api/analysis_predictor.h" +#include "paddle/fluid/inference/api/resource_manager.h" #if defined(PADDLE_WITH_CUDA) #include #endif @@ -183,18 +184,11 @@ TEST(AnalysisPredictor, CollectShapeRangeInfo) { w1->Reshape({4, 1}); w2->Reshape({4, 1}); w3->Reshape({4, 1}); - - auto* w0_data = w0->mutable_data(PaddlePlace::kCPU); - auto* w1_data = w1->mutable_data(PaddlePlace::kCPU); - auto* w2_data = w2->mutable_data(PaddlePlace::kCPU); - auto* w3_data = w3->mutable_data(PaddlePlace::kCPU); - - for (int i = 0; i < 4; i++) { - w0_data[i] = i; - w1_data[i] = i; - w2_data[i] = i; - w3_data[i] = i; - } + std::vector input_data{0, 1, 2, 3}; + w0->copy_from_cpu(input_data.data()); + w1->copy_from_cpu(input_data.data()); + w2->copy_from_cpu(input_data.data()); + w3->copy_from_cpu(input_data.data()); predictor->ZeroCopyRun(); @@ -539,6 +533,103 @@ TEST(Tensor, GpuShareExternalData) { LOG(INFO) << "output size: " << size / sizeof(float); predictor->TryShrinkMemory(); } + +TEST(Predictor, Streams) { + // internal stream. + { + Config config; + config.SetModel(FLAGS_dirname); + config.EnableUseGpu(100, 0); + auto predictor = CreatePredictor(config); + gpuStream_t stream = + reinterpret_cast(predictor->GetExecStream()); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 0); + } + + // internal stream, create 2 predictor. + { + Config config1; + config1.SetModel(FLAGS_dirname); + config1.EnableUseGpu(100, 0); + auto predictor1 = CreatePredictor(config1); + gpuStream_t stream1 = + reinterpret_cast(predictor1->GetExecStream()); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream1), 0); + + Config config2; + config2.SetModel(FLAGS_dirname); + config2.EnableUseGpu(100, 0); + auto predictor2 = CreatePredictor(config2); + gpuStream_t stream2 = + reinterpret_cast(predictor2->GetExecStream()); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 0); + CHECK_EQ(stream1, stream2); + } + + // internal stream, clone + { + Config config; + config.SetModel(FLAGS_dirname); + config.EnableUseGpu(100, 0); + auto predictor = CreatePredictor(config); + gpuStream_t stream = + reinterpret_cast(predictor->GetExecStream()); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 0); + + auto predictor2 = predictor->Clone(); + gpuStream_t stream2 = + reinterpret_cast(predictor2->GetExecStream()); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 0); + CHECK_EQ(stream, stream2); + } + + // external stream + { + cudaStream_t external_stream; + cudaStreamCreate(&external_stream); + Config config; + config.SetModel(FLAGS_dirname); + config.EnableUseGpu(100, 0); + config.SetExecStream(external_stream); + CHECK_EQ(config.external_stream_enabled(), true); + + auto predictor = CreatePredictor(config); + gpuStream_t stream = + reinterpret_cast(predictor->GetExecStream()); + CHECK_EQ(external_stream, stream); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1); + } + + // 2 predictor on 2 stream + { + cudaStream_t external_stream; + cudaStreamCreate(&external_stream); + Config config; + config.SetModel(FLAGS_dirname); + config.EnableUseGpu(100, 0); + config.SetExecStream(external_stream); + auto predictor = CreatePredictor(config); + gpuStream_t stream = + reinterpret_cast(predictor->GetExecStream()); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1); + + cudaStream_t external_stream2; + cudaStreamCreate(&external_stream2); + Config config2; + config2.SetModel(FLAGS_dirname); + config2.EnableUseGpu(100, 0); + config2.SetExecStream(external_stream2); + auto predictor2 = CreatePredictor(config2); + gpuStream_t stream2 = + reinterpret_cast(predictor2->GetExecStream()); + CHECK_NOTNULL(paddle::ResourceManager::Instance().GetGPUResource(stream2)); + CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 1); + + CHECK_NE(stream, stream2); + } +} #endif } // namespace paddle_infer diff --git a/paddle/fluid/inference/api/api_impl.cc b/paddle/fluid/inference/api/api_impl.cc index 38960aecb70..28f2010161d 100644 --- a/paddle/fluid/inference/api/api_impl.cc +++ b/paddle/fluid/inference/api/api_impl.cc @@ -181,7 +181,7 @@ bool NativePaddlePredictor::Run(const std::vector &inputs, return true; } -std::unique_ptr NativePaddlePredictor::Clone() { +std::unique_ptr NativePaddlePredictor::Clone(void *stream) { std::lock_guard lk(clone_mutex_); VLOG(3) << "Predictor::clone"; std::unique_ptr cls(new NativePaddlePredictor(config_)); diff --git a/paddle/fluid/inference/api/api_impl.h b/paddle/fluid/inference/api/api_impl.h index d503d258139..14a7e22e644 100644 --- a/paddle/fluid/inference/api/api_impl.h +++ b/paddle/fluid/inference/api/api_impl.h @@ -51,7 +51,7 @@ class NativePaddlePredictor : public PaddlePredictor { std::vector *output_data, int batch_size = -1) override; - std::unique_ptr Clone() override; + std::unique_ptr Clone(void *stream = nullptr) override; ~NativePaddlePredictor() override; diff --git a/paddle/fluid/inference/api/api_tester.cc b/paddle/fluid/inference/api/api_tester.cc index 1faf46fad2b..cc6527a7e55 100644 --- a/paddle/fluid/inference/api/api_tester.cc +++ b/paddle/fluid/inference/api/api_tester.cc @@ -46,7 +46,9 @@ class DemoPredictor : public PaddlePredictor { return false; } - std::unique_ptr Clone() override { return nullptr; } + std::unique_ptr Clone(void *stream = nullptr) override { + return nullptr; + } ~DemoPredictor() override {} }; diff --git a/paddle/fluid/inference/api/details/zero_copy_tensor.cc b/paddle/fluid/inference/api/details/zero_copy_tensor.cc index 661d9def406..ae0af77319e 100644 --- a/paddle/fluid/inference/api/details/zero_copy_tensor.cc +++ b/paddle/fluid/inference/api/details/zero_copy_tensor.cc @@ -94,7 +94,17 @@ T *Tensor::mutable_data(PlaceType place) { return tensor->mutable_data(paddle::platform::CPUPlace()); } case static_cast(PlaceType::kGPU): { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + paddle::platform::CUDAPlace gpu_place(device_); + auto *dev_ctxs = reinterpret_cast>> + *>(device_contexs_); + auto *dev_ctx = + static_cast(dev_ctxs->at(gpu_place).get().get()); + return dev_ctx->Alloc(tensor, tensor->numel() * sizeof(T)); +#else return tensor->mutable_data(paddle::platform::CUDAPlace(device_)); +#endif } case static_cast(PlaceType::kXPU): { return tensor->mutable_data(paddle::platform::XPUPlace(device_)); @@ -181,12 +191,14 @@ void Tensor::CopyFromCpu(const T *data) { std::memcpy(static_cast(t_data), data, ele_size); } else if (place_ == PlaceType::kGPU) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - paddle::platform::DeviceContextPool &pool = - paddle::platform::DeviceContextPool::Instance(); + paddle::platform::CUDAPlace gpu_place(device_); - auto *t_data = tensor->mutable_data(gpu_place); - auto *dev_ctx = static_cast( - pool.Get(gpu_place)); + auto *dev_ctxs = reinterpret_cast>> *>( + device_contexs_); + auto *dev_ctx = + static_cast(dev_ctxs->at(gpu_place).get().get()); + auto *t_data = dev_ctx->Alloc(tensor, tensor->numel() * sizeof(T)); paddle::memory::Copy(gpu_place, static_cast(t_data), paddle::platform::CPUPlace(), data, ele_size, @@ -359,11 +371,12 @@ void Tensor::CopyToCpuImpl(T *data, void *exec_stream, CallbackFunc cb, #endif } else if (place_ == PlaceType::kGPU) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - paddle::platform::DeviceContextPool &pool = - paddle::platform::DeviceContextPool::Instance(); auto gpu_place = t_place; - auto *dev_ctx = static_cast( - pool.Get(gpu_place)); + auto *dev_ctxs = reinterpret_cast>> *>( + device_contexs_); + auto *dev_ctx = + static_cast(dev_ctxs->at(gpu_place).get().get()); paddle::memory::Copy(paddle::platform::CPUPlace(), static_cast(data), gpu_place, t_data, ele_num * sizeof(T), dev_ctx->stream()); @@ -547,7 +560,8 @@ template PD_INFER_DECL uint8_t *Tensor::mutable_data(PlaceType place); template PD_INFER_DECL int8_t *Tensor::mutable_data(PlaceType place); template PD_INFER_DECL float16 *Tensor::mutable_data(PlaceType place); -Tensor::Tensor(void *scope) : scope_{scope} {} +Tensor::Tensor(void *scope, const void *device_contexts) + : scope_{scope}, device_contexs_(device_contexts) {} template void *Tensor::FindTensor() const { diff --git a/paddle/fluid/inference/api/details/zero_copy_tensor_test.cc b/paddle/fluid/inference/api/details/zero_copy_tensor_test.cc index 4b6f90f3f06..5a04656bc30 100644 --- a/paddle/fluid/inference/api/details/zero_copy_tensor_test.cc +++ b/paddle/fluid/inference/api/details/zero_copy_tensor_test.cc @@ -25,14 +25,19 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/inference/api/helper.h" #include "paddle/fluid/inference/api/paddle_tensor.h" +#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/place.h" namespace paddle_infer { struct TensorWrapper : public Tensor { - TensorWrapper(paddle_infer::PlaceType place, paddle::framework::Scope* scope, - const std::string& name) - : Tensor{static_cast(scope)} { + TensorWrapper( + paddle_infer::PlaceType place, paddle::framework::Scope* scope, + const std::map>>* + dev_ctxs, + const std::string& name) + : Tensor{static_cast(scope), dev_ctxs} { SetPlace(place, 0 /*device_id*/); SetName(name); input_or_output_ = true; @@ -42,7 +47,11 @@ struct TensorWrapper : public Tensor { std::unique_ptr CreateTensor(paddle_infer::PlaceType place, paddle::framework::Scope* scope, const std::string& name) { - return std::unique_ptr(new TensorWrapper{place, scope, name}); + paddle::platform::DeviceContextPool& pool = + paddle::platform::DeviceContextPool::Instance(); + const auto& dev_ctxs = pool.device_contexts(); + return std::unique_ptr( + new TensorWrapper{place, scope, &dev_ctxs, name}); } template diff --git a/paddle/fluid/inference/api/onnxruntime_predictor.cc b/paddle/fluid/inference/api/onnxruntime_predictor.cc index 93a96863053..326da0e4339 100644 --- a/paddle/fluid/inference/api/onnxruntime_predictor.cc +++ b/paddle/fluid/inference/api/onnxruntime_predictor.cc @@ -243,7 +243,7 @@ std::unique_ptr ONNXRuntimePredictor::GetInputTensor( "The in variable named %s is not found in the " "ONNXPredictor.", name)); - std::unique_ptr res(new ZeroCopyTensor(nullptr)); + std::unique_ptr res(new ZeroCopyTensor(nullptr, this)); res->input_or_output_ = true; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -264,7 +264,7 @@ std::unique_ptr ONNXRuntimePredictor::GetOutputTensor( "The out variable named %s is not found in the " "ONNXPredictor.", name)); - std::unique_ptr res(new ZeroCopyTensor(nullptr)); + std::unique_ptr res(new ZeroCopyTensor(nullptr, this)); res->input_or_output_ = false; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -309,7 +309,7 @@ bool ONNXRuntimePredictor::ZeroCopyRun() { return true; } -std::unique_ptr ONNXRuntimePredictor::Clone() { +std::unique_ptr ONNXRuntimePredictor::Clone(void *stream) { LOG(ERROR) << "Not support Clone(), Please create new Predictor"; return nullptr; } @@ -325,4 +325,14 @@ ONNXRuntimePredictor::~ONNXRuntimePredictor() { memory::Release(place_); } +const void *ONNXRuntimePredictor::GetDeviceContexts() const { + // TODO(inference): Support private device contexts. + paddle::platform::DeviceContextPool &pool = + paddle::platform::DeviceContextPool::Instance(); + const auto &dev_ctxs = pool.device_contexts(); + return &const_cast>> &>( + dev_ctxs); +} + } // namespace paddle diff --git a/paddle/fluid/inference/api/onnxruntime_predictor.h b/paddle/fluid/inference/api/onnxruntime_predictor.h index 294a83a4335..4c44a7dc0a9 100644 --- a/paddle/fluid/inference/api/onnxruntime_predictor.h +++ b/paddle/fluid/inference/api/onnxruntime_predictor.h @@ -174,7 +174,10 @@ class ONNXRuntimePredictor : public PaddlePredictor { /// /// \return get a new predictor /// - std::unique_ptr Clone() override; + std::unique_ptr Clone(void *stream = nullptr) override; + + protected: + const void *GetDeviceContexts() const override; private: /// diff --git a/paddle/fluid/inference/api/paddle_analysis_config.h b/paddle/fluid/inference/api/paddle_analysis_config.h index 489c32bc59d..fe82bbf29cb 100644 --- a/paddle/fluid/inference/api/paddle_analysis_config.h +++ b/paddle/fluid/inference/api/paddle_analysis_config.h @@ -585,6 +585,25 @@ struct PD_INFER_DECL AnalysisConfig { /// bool trt_allow_build_at_runtime(); + /// + /// \brief Set execution stream. If not set a stream will be created + /// internally. + /// + void SetExecStream(void* stream); + + /// + /// \brief Get execution stream. The user needs to explicitly cast into a + /// stream type such as cudaStream_t, hipStream_t, etc. + /// + void* GetExecStream() const; + + /// + /// \brief Whether the external stream is used, if True, the predictor clone + /// operation must use the external stream, otherwise the framework manages + /// the stream internally. + /// + bool external_stream_enabled() const; + /// /// \brief Collect shape info of all tensors in compute graph. /// @@ -926,6 +945,8 @@ struct PD_INFER_DECL AnalysisConfig { "matrix_nms"}; bool use_cudnn_{false}; + bool use_external_stream_{false}; + void* exec_stream_{nullptr}; // NPU related bool use_npu_{false}; diff --git a/paddle/fluid/inference/api/paddle_api.h b/paddle/fluid/inference/api/paddle_api.h index 78af756c24b..b28370fb822 100644 --- a/paddle/fluid/inference/api/paddle_api.h +++ b/paddle/fluid/inference/api/paddle_api.h @@ -195,7 +195,8 @@ class PD_INFER_DECL ZeroCopyTensor : public paddle_infer::Tensor { private: friend class AnalysisPredictor; friend class ONNXRuntimePredictor; - explicit ZeroCopyTensor(void* scope) : paddle_infer::Tensor{scope} {} + explicit ZeroCopyTensor(void* scope, const void* device_contexts) + : paddle_infer::Tensor{scope, device_contexts} {} }; /// \brief A Predictor for executing inference on a model. @@ -286,7 +287,7 @@ class PD_INFER_DECL PaddlePredictor { /// When using clone, the same network will be created, /// and the parameters between them are shared. /// \return unique_ptr which contains the pointer of predictor - virtual std::unique_ptr Clone() = 0; + virtual std::unique_ptr Clone(void* stream = nullptr) = 0; /// \brief Destroy the Predictor. virtual ~PaddlePredictor() = default; @@ -300,6 +301,11 @@ class PD_INFER_DECL PaddlePredictor { struct Config { std::string model_dir; /*!< path to the model directory. */ }; + + virtual void* GetExecStream() const { return nullptr; } + + protected: + virtual const void* GetDeviceContexts() const { return nullptr; } }; /// diff --git a/paddle/fluid/inference/api/paddle_inference_api.h b/paddle/fluid/inference/api/paddle_inference_api.h index 58ccd79d84d..3111db026c4 100644 --- a/paddle/fluid/inference/api/paddle_inference_api.h +++ b/paddle/fluid/inference/api/paddle_inference_api.h @@ -133,7 +133,7 @@ class PD_INFER_DECL Predictor { /// /// \return get a new predictor /// - std::unique_ptr Clone(); + std::unique_ptr Clone(void* stream = nullptr); /// \brief Clear the intermediate tensors of the predictor void ClearIntermediateTensor(); @@ -149,6 +149,14 @@ class PD_INFER_DECL Predictor { /// uint64_t TryShrinkMemory(); + /// + /// \brief Get the execution stream on devices with a concept of stream, + /// otherwise returns nullptr. + /// + /// \return The execution stream or nullptr (CPU). + /// + void* GetExecStream() const; + private: std::unique_ptr predictor_; friend class paddle_infer::experimental::InternalUtils; diff --git a/paddle/fluid/inference/api/paddle_tensor.h b/paddle/fluid/inference/api/paddle_tensor.h index 11086b369fc..39ba366f35d 100644 --- a/paddle/fluid/inference/api/paddle_tensor.h +++ b/paddle/fluid/inference/api/paddle_tensor.h @@ -162,7 +162,7 @@ class PD_INFER_DECL Tensor { PlaceType place() const; protected: - explicit Tensor(void* scope); + explicit Tensor(void* scope, const void* device_contexs); template void* FindTensor() const; @@ -181,6 +181,7 @@ class PD_INFER_DECL Tensor { DataType dtype_; bool input_or_output_; void* scope_{nullptr}; + const void* device_contexs_{nullptr}; PlaceType place_; int device_; diff --git a/paddle/fluid/inference/tests/api/paddle_infer_api_copy_tensor_tester.cc b/paddle/fluid/inference/tests/api/paddle_infer_api_copy_tensor_tester.cc index 38bcb7645ab..36fcf4ba8da 100644 --- a/paddle/fluid/inference/tests/api/paddle_infer_api_copy_tensor_tester.cc +++ b/paddle/fluid/inference/tests/api/paddle_infer_api_copy_tensor_tester.cc @@ -32,7 +32,10 @@ class InferApiTesterUtils { const std::string &name, PlaceType place, void *p_scope) { auto var = static_cast(p_scope)->Var(name); var->GetMutable(); - std::unique_ptr res(new Tensor(p_scope)); + paddle::platform::DeviceContextPool &pool = + paddle::platform::DeviceContextPool::Instance(); + const auto &dev_ctxs = pool.device_contexts(); + std::unique_ptr res(new Tensor(p_scope, &dev_ctxs)); res->input_or_output_ = true; res->SetName(name); res->SetPlace(place, 0 /*device id*/); diff --git a/paddle/fluid/inference/tests/infer_ut/test_LeViT.cc b/paddle/fluid/inference/tests/infer_ut/test_LeViT.cc index b069feaec1a..c9298692334 100644 --- a/paddle/fluid/inference/tests/infer_ut/test_LeViT.cc +++ b/paddle/fluid/inference/tests/infer_ut/test_LeViT.cc @@ -13,6 +13,9 @@ // limitations under the License. #include "test_suite.h" // NOLINT +#ifdef PADDLE_WITH_GPU +#include +#endif DEFINE_string(modeldir, "", "Directory of the inference model."); @@ -170,6 +173,70 @@ TEST(tensorrt_tester_LeViT, multi_thread4_trt_fp32_bz2) { std::cout << "finish multi-thread test" << std::endl; } +#ifdef PADDLE_WITH_GPU +TEST(tensorrt_tester_LeViT, multi_stream_thread4_trt_fp32_bz2) { + int thread_num = 4; + + // init stream + std::vector streams(thread_num); + for (size_t i = 0; i < thread_num; ++i) { + cudaStreamCreate(&streams[i]); + } + + // init input data + std::map my_input_data_map; + my_input_data_map["x"] = PrepareInput(2); + // init output data + std::map infer_output_data, + truth_output_data; + // prepare groudtruth config + paddle_infer::Config config, config_no_ir; + config_no_ir.SetModel(FLAGS_modeldir + "/inference.pdmodel", + FLAGS_modeldir + "/inference.pdiparams"); + config_no_ir.SwitchIrOptim(false); + // prepare inference config + config.SetModel(FLAGS_modeldir + "/inference.pdmodel", + FLAGS_modeldir + "/inference.pdiparams"); + config.EnableUseGpu(100, 0); + config.EnableTensorRtEngine( + 1 << 20, 2, 50, paddle_infer::PrecisionType::kFloat32, false, false); + // get groudtruth by disbale ir + + paddle_infer::services::PredictorPool pred_pool_no_ir(config_no_ir, 1); + SingleThreadPrediction(pred_pool_no_ir.Retrive(0), &my_input_data_map, + &truth_output_data, 1); + + // get infer results from multi threads + std::vector threads; + config.SetExecStream(streams[0]); + config.pass_builder()->DeletePass("add_support_int8_pass"); + auto main_predictor = CreatePredictor(config); + std::vector predictors; + for (size_t i = 0; i < thread_num - 1; ++i) { + predictors.push_back(std::move(main_predictor->Clone(streams[i + 1]))); + LOG(INFO) << "predictors[" << i << "] stream is " + << predictors[i]->GetExecStream(); + } + predictors.push_back(std::move(main_predictor)); + LOG(INFO) << "predictors[" << thread_num - 1 << "] stream is " + << predictors[thread_num - 1]->GetExecStream(); + for (int i = 0; i < thread_num; ++i) { + threads.emplace_back(paddle::test::SingleThreadPrediction, + predictors[i].get(), &my_input_data_map, + &infer_output_data, 10); + } + + // thread join & check outputs + for (int i = 0; i < thread_num; ++i) { + LOG(INFO) << "join tid : " << i; + threads[i].join(); + // CompareRecord(&truth_output_data, &infer_output_data); + } + + std::cout << "finish multi-thread test" << std::endl; +} +#endif + } // namespace paddle_infer int main(int argc, char** argv) { diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index fd61b813f0a..d990aab5773 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -129,11 +129,22 @@ DeviceType Place2DeviceType(const platform::Place& place) { } DeviceContextPool* DeviceContextPool::pool = nullptr; +thread_local const std::map>>* + DeviceContextPool::external_device_contexts_ = nullptr; platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) { VLOG(6) << "DeviceContextPool Get: " << place; - auto it = device_contexts_.find(place); - if (it == device_contexts_.end()) { + const std::map>>* + ptr; + if (external_device_contexts_ && external_device_contexts_->count(place)) { + ptr = external_device_contexts_; + } else { + ptr = &device_contexts_; + } + + auto it = ptr->find(place); + if (it == ptr->end()) { PADDLE_THROW(platform::errors::Unimplemented( "Place %s is not supported. Please check that your paddle compiles " "with WITH_GPU, WITH_XPU, WITH_IPU, WITH_MLU or WITH_ASCEND_CL option " @@ -145,6 +156,27 @@ platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) { return it->second.get().get(); } +size_t DeviceContextPool::size() const { + if (external_device_contexts_) { + return external_device_contexts_->size(); + } + return device_contexts_.size(); +} + +const std::map>>& +DeviceContextPool::device_contexts() const { + if (external_device_contexts_) { + return *external_device_contexts_; + } + return device_contexts_; +} + +void DeviceContextPool::SetDeviceContexts( + const std::map>>* + dev_ctxs) { + external_device_contexts_ = dev_ctxs; +} + template inline void EmplaceDeviceContext( std::map>>* diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index d0dae706ba5..1855f43f9d6 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -57,7 +57,7 @@ limitations under the License. */ #endif #ifdef PADDLE_WITH_MKLDNN -#include "dnnl.hpp" +#include "dnnl.hpp" // NOLINT #include "paddle/fluid/framework/data_layout.h" #endif @@ -915,17 +915,22 @@ class DeviceContextPool { const typename DefaultDeviceContextType::TYPE*>(Get(place)); } - size_t size() const { return device_contexts_.size(); } + size_t size() const; const std::map>>& - device_contexts() const { - return device_contexts_; - } + device_contexts() const; + + static void SetDeviceContexts( + const std::map>>*); private: static DeviceContextPool* pool; std::map>> device_contexts_; + static thread_local const std::map< + Place, std::shared_future>>* + external_device_contexts_; // not owned DISABLE_COPY_AND_ASSIGN(DeviceContextPool); }; -- GitLab