From 114723c996ef1220593a3be7a9ed9deb451dba84 Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Fri, 10 Jun 2022 11:20:48 +0800 Subject: [PATCH] Refactor DeviceContextPool (#42901) * Refactor DeviceContextPool * Adjust header file order --- .../fast_threaded_ssa_graph_executor.cc | 8 +- .../fast_threaded_ssa_graph_executor.h | 3 +- .../details/threaded_ssa_graph_executor.cc | 7 +- .../details/threaded_ssa_graph_executor.h | 4 +- .../framework/new_executor/stream_analyzer.cc | 24 ++- .../framework/new_executor/stream_analyzer.h | 17 +- .../memory/allocation/allocator_facade.cc | 21 +-- paddle/fluid/platform/device_context.cc | 166 +++++++++++------- paddle/fluid/platform/device_context.h | 11 +- 9 files changed, 160 insertions(+), 101 deletions(-) diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index 8b5c3c17987..ca1ebe18b44 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -39,9 +39,11 @@ FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( local_exec_scopes_(local_exec_scopes), places_(places), graph_(graph), - fetch_ctxs_(places), // add one more thread for generate op_deps prepare_pool_(1) { + platform::EmplaceDeviceContexts( + &fetch_ctxs_, places, + /*disable_setting_default_stream_for_allocator=*/true); if (ir::IsTopologySortOperationsUnique(*graph_)) { VLOG(10) << "Change thread number to 1 because the toposort order is unique"; @@ -144,7 +146,7 @@ FetchResultType FastThreadedSSAGraphExecutor::Run( ClearFetchOp(graph_, &fetch_ops); for (auto &place : places_) { - fetch_ctxs_.Get(place)->Wait(); + fetch_ctxs_[place].get().get()->Wait(); } } @@ -195,7 +197,7 @@ void FastThreadedSSAGraphExecutor::InsertFetchOps( fetch_ops->emplace_back(op); for (auto &p : places_) { - op->SetDeviceContext(p, fetch_ctxs_.Get(p)); + op->SetDeviceContext(p, fetch_ctxs_[p].get().get()); } for (auto *var : vars) { diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index 19b00615715..f535a888b4e 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -54,7 +54,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_map op_deps_; std::vector bootstrap_ops_; - platform::DeviceContextPool fetch_ctxs_; + std::map>> + fetch_ctxs_; std::atomic remaining_; std::future< diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 39683c9a0d8..ef9b309c8d8 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -32,11 +32,14 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( local_scopes_(local_scopes), local_exec_scopes_(local_exec_scopes), places_(places), - fetch_ctxs_(places), strategy_(strategy), prepare_pool_(1), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr) { + platform::EmplaceDeviceContexts( + &fetch_ctxs_, places, + /*disable_setting_default_stream_for_allocator=*/true); + if (strategy_.num_iteration_per_run_ > 1) { int read_op_num = 0; for (auto *node : graph_->Nodes()) { @@ -207,7 +210,7 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( fetch_ops->emplace_back(op); for (auto &p : places_) { - op->SetDeviceContext(p, fetch_ctxs_.Get(p)); + op->SetDeviceContext(p, fetch_ctxs_[p].get().get()); } for (auto *var : vars) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 45fa3adbf14..c9a2a7eccdc 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -77,7 +77,9 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector local_exec_scopes_; std::vector places_; - platform::DeviceContextPool fetch_ctxs_; + std::map>> + fetch_ctxs_; + ExceptionHolder exception_holder_; std::unique_ptr op_deps_; std::future> op_deps_futures_; diff --git a/paddle/fluid/framework/new_executor/stream_analyzer.cc b/paddle/fluid/framework/new_executor/stream_analyzer.cc index 6c689c8548b..469876b01f6 100644 --- a/paddle/fluid/framework/new_executor/stream_analyzer.cc +++ b/paddle/fluid/framework/new_executor/stream_analyzer.cc @@ -14,11 +14,31 @@ #include "paddle/fluid/framework/new_executor/stream_analyzer.h" +#include #include +#include "paddle/fluid/platform/device_context.h" + namespace paddle { namespace framework { +StreamAnalyzer::StreamAnalyzer(const platform::Place& place) : place_(place) { + if (platform::is_gpu_place(place)) { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + platform::EmplaceDeviceContexts( + &d2h_ctxs_, {place}, + /*disable_setting_default_stream_for_allocator=*/true); + platform::EmplaceDeviceContexts( + &h2d_ctxs_, {place}, + /*disable_setting_default_stream_for_allocator=*/true); +#else + PADDLE_THROW( + platform::errors::Unimplemented("CUDAPlace is not supported. Please " + "re-compile with WITH_GPU option.")); +#endif + } +} + /* * Parse the var_ids that need to be associated with an event. * The caller should guarantee front_op and back_op satisfy the @@ -137,10 +157,10 @@ platform::DeviceContext* StreamAnalyzer::ParseDeviceContext( auto* dev_ctx = op_func_node.dev_ctx_; if (op_type == interpreter::kMemcpyD2H) { VLOG(3) << "Get dev_ctx from d2h_context_pool_"; - dev_ctx = d2h_ctx_pool_.Get(place_); + dev_ctx = d2h_ctxs_[place_].get().get(); } else if (op_type == interpreter::kMemcpyH2D) { VLOG(3) << "Get dev_ctx from h2d_context_pool_"; - dev_ctx = h2d_ctx_pool_.Get(place_); + dev_ctx = h2d_ctxs_[place_].get().get(); } return dev_ctx; diff --git a/paddle/fluid/framework/new_executor/stream_analyzer.h b/paddle/fluid/framework/new_executor/stream_analyzer.h index 8a6552c6883..c57bab9c9c2 100644 --- a/paddle/fluid/framework/new_executor/stream_analyzer.h +++ b/paddle/fluid/framework/new_executor/stream_analyzer.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include @@ -25,15 +26,17 @@ namespace framework { class StreamAnalyzer { public: - explicit StreamAnalyzer(const platform::Place& place) - : place_(place), d2h_ctx_pool_({place}), h2d_ctx_pool_({place}) {} + using Place = platform::Place; + using DeviceContext = platform::DeviceContext; + + explicit StreamAnalyzer(const Place& place); ~StreamAnalyzer() {} void Schedule(const std::vector& downstream_ops, std::vector* instructions, size_t op_index); - platform::DeviceContext* ParseDeviceContext(const OpFuncNode& op_func_node); + DeviceContext* ParseDeviceContext(const OpFuncNode& op_func_node); private: std::vector GetNeedEventVarIds(const Instruction& cur_instr, @@ -42,16 +45,16 @@ class StreamAnalyzer { void ConstructEventForVar(const std::vector& new_event_var_id, Instruction* next_instr, platform::DeviceType waiter_type, - const platform::Place& place); + const Place& place); bool IsDirectRun(Instruction& cur_instr, // NOLINT const Instruction& next_instr); platform::DeviceType GetWaiterType(const Instruction& instr); - platform::Place place_; - platform::DeviceContextPool d2h_ctx_pool_; - platform::DeviceContextPool h2d_ctx_pool_; + Place place_; + std::map>> d2h_ctxs_; + std::map>> h2d_ctxs_; std::map> var_id2event_; }; diff --git a/paddle/fluid/memory/allocation/allocator_facade.cc b/paddle/fluid/memory/allocation/allocator_facade.cc index d72af70657a..13536be5b40 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.cc +++ b/paddle/fluid/memory/allocation/allocator_facade.cc @@ -419,21 +419,12 @@ class AllocatorFacadePrivate { const std::shared_ptr& allocator = GetDefaultStreamSafeCUDAAllocator(place); - // NOTE(Ruibiao): The default stream will be set when the CUDADeviceContext - // created. Normally, the DeviceContextPool is a global singleton and one - // Place only correspond to one DeviceContext. However, to support - // multi-stream scheduling, standalone executor creates two extra - // DeviceContextPools for H2D and D2H stream in StreamAnalyzer, which make - // one Place correspond to multiple DeviceContext and unexpectedly reset the - // default stream in runtime. To avoid this behavior, we do not allow - // changing default stream after initially setting. - if (allocator->GetDefaultStream() != nullptr) { - VLOG(5) << "The default stream for StreamSafeCUDAAllocator(" - << allocator.get() << ") in " << place << " has been set to " - << allocator->GetDefaultStream() - << " before, not allow to change now."; - return; - } + PADDLE_ENFORCE_EQ( + allocator->GetDefaultStream(), nullptr, + platform::errors::Unavailable( + "The default stream for StreamSafeCUDAAllocator(%p) in %s has been " + "set to %p, not allow to change it to %p.", + allocator.get(), place, allocator->GetDefaultStream(), stream)); allocator->SetDefaultStream(stream); VLOG(8) << "Set default stream to " << stream diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index d990aab5773..8928f5417d1 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -11,13 +11,21 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include "paddle/fluid/platform/device_context.h" #include #include #include +#include "glog/logging.h" +#include "paddle/fluid/framework/expect.h" +#include "paddle/fluid/framework/generator.h" +#include "paddle/fluid/memory/allocation/allocator_facade.h" +#include "paddle/fluid/platform/device/device_wrapper.h" #include "paddle/fluid/platform/place.h" +#include "paddle/fluid/platform/profiler.h" +#include "paddle/fluid/platform/profiler/event_tracing.h" #include "paddle/fluid/platform/stream/cuda_stream.h" #include "paddle/phi/backends/gpu/gpu_context.h" #include "paddle/phi/core/allocator.h" @@ -26,17 +34,11 @@ limitations under the License. */ #include "paddle/fluid/memory/allocation/cuda_device_context_allocator.h" #include "paddle/fluid/platform/cuda_device_guard.h" #endif + #ifdef PADDLE_WITH_MLU #include "paddle/fluid/platform/device/mlu/device_context.h" #include "paddle/fluid/platform/device/mlu/device_context_allocator.h" #endif -#include "glog/logging.h" -#include "paddle/fluid/framework/expect.h" -#include "paddle/fluid/framework/generator.h" -#include "paddle/fluid/memory/allocation/allocator_facade.h" -#include "paddle/fluid/platform/device/device_wrapper.h" -#include "paddle/fluid/platform/profiler.h" -#include "paddle/fluid/platform/profiler/event_tracing.h" namespace paddle { namespace memory { @@ -178,75 +180,89 @@ void DeviceContextPool::SetDeviceContexts( } template -inline void EmplaceDeviceContext( - std::map>>* - map_ptr, - platform::Place p) { +std::unique_ptr CreateDeviceContext( + const platform::Place& p, + bool disable_setting_default_stream_for_allocator = false) { using PtrType = std::unique_ptr; - map_ptr->emplace( - p, std::async(std::launch::deferred, [=] { - // lazy evaluation. i.e., only create device context at - // first `Get` - auto* dev_ctx = new DevCtx(p); - if (is_gpu_place(p)) { + auto* dev_ctx = new DevCtx(p); + if (is_gpu_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - auto* cuda_ctx = dynamic_cast(dev_ctx); - PADDLE_ENFORCE_NOT_NULL( - cuda_ctx, - platform::errors::InvalidArgument( - "Failed to dynamic_cast dev_ctx into CUDADeviceContext.")); - dev_ctx->SetAllocator(memory::allocation::AllocatorFacade::Instance() - .GetAllocator(p) - .get()); - dev_ctx->SetPinnedAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetAllocator(paddle::platform::CUDAPinnedPlace()) - .get()); - - cuda_ctx->PartialInitWithAllocator(); - dev_ctx->SetGenerator( - framework::DefaultCUDAGenerator(p.GetDeviceId()).get()); -#endif - } else { - dev_ctx->SetAllocator(memory::allocation::AllocatorFacade::Instance() - .GetAllocator(p) - .get()); - dev_ctx->SetGenerator(framework::DefaultCPUGenerator().get()); - } - dev_ctx->SetHostGenerator(framework::DefaultCPUGenerator().get()); - dev_ctx->SetHostAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetAllocator(platform::CPUPlace()) - .get()); - dev_ctx->SetZeroAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetZeroAllocator(p) - .get()); - return PtrType(dev_ctx); - })); + auto* cuda_ctx = dynamic_cast(dev_ctx); + PADDLE_ENFORCE_NOT_NULL( + cuda_ctx, + platform::errors::InvalidArgument( + "Failed to dynamic_cast dev_ctx into CUDADeviceContext.")); + + auto& instance = memory::allocation::AllocatorFacade::Instance(); + if (!disable_setting_default_stream_for_allocator) { + instance.SetDefaultStream(CUDAPlace(p.GetDeviceId()), cuda_ctx->stream()); + } + dev_ctx->SetAllocator(instance.GetAllocator(p).get()); + dev_ctx->SetPinnedAllocator( + instance.GetAllocator(paddle::platform::CUDAPinnedPlace()).get()); + + cuda_ctx->PartialInitWithAllocator(); + dev_ctx->SetGenerator( + framework::DefaultCUDAGenerator(p.GetDeviceId()).get()); +#endif + } else { + dev_ctx->SetAllocator( + memory::allocation::AllocatorFacade::Instance().GetAllocator(p).get()); + dev_ctx->SetGenerator(framework::DefaultCPUGenerator().get()); + } + dev_ctx->SetHostGenerator(framework::DefaultCPUGenerator().get()); + dev_ctx->SetHostAllocator(memory::allocation::AllocatorFacade::Instance() + .GetAllocator(platform::CPUPlace()) + .get()); + dev_ctx->SetZeroAllocator(memory::allocation::AllocatorFacade::Instance() + .GetZeroAllocator(p) + .get()); + return PtrType(dev_ctx); } -DeviceContextPool::DeviceContextPool( - const std::vector& places) { +template +inline void EmplaceDeviceContext( + std::map>>* + place_to_device_context, + platform::Place place, bool disable_setting_default_stream_for_allocator) { + // lazy evaluation. i.e., only create device context at first `Get` + place_to_device_context->emplace( + place, std::async(std::launch::deferred, CreateDeviceContext, + place, disable_setting_default_stream_for_allocator)); +} + +void EmplaceDeviceContexts( + std::map>>* + place_to_device_context, + const std::vector& places, + bool disable_setting_default_stream_for_allocator) { PADDLE_ENFORCE_GT( places.size(), 0, platform::errors::InvalidArgument("The number of platform places should " "be larger than 0. But received %d.", places.size())); + std::set set; for (auto& p : places) { set.insert(p); } + for (auto& p : set) { if (platform::is_cpu_place(p)) { #ifdef PADDLE_WITH_MKLDNN - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #endif } else if (platform::is_gpu_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("CUDAPlace is not supported. Please " @@ -254,7 +270,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_cuda_pinned_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "CUDAPlace is not supported. Please re-compile with WITH_GPU " @@ -262,7 +280,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_xpu_place(p)) { #ifdef PADDLE_WITH_XPU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("XPUPlace is not supported. Please " @@ -270,7 +290,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_mlu_place(p)) { #ifdef PADDLE_WITH_MLU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("MLUPlace is not supported. Please " @@ -278,7 +300,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_ipu_place(p)) { #ifdef PADDLE_WITH_IPU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("IPUPlace is not supported. Please " @@ -286,7 +310,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_npu_place(p)) { #ifdef PADDLE_WITH_ASCEND_CL - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "NPUPlace is not supported. Please " @@ -294,7 +320,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_npu_pinned_place(p)) { #ifdef PADDLE_WITH_ASCEND_CL - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "NPUPinnedPlace is not supported. Please re-compile with " @@ -303,7 +331,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_custom_place(p)) { #ifdef PADDLE_WITH_CUSTOM_DEVICE - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "CustomPlace is not supported. Please re-compile with " @@ -314,6 +344,12 @@ DeviceContextPool::DeviceContextPool( } } +DeviceContextPool::DeviceContextPool( + const std::vector& places) { + EmplaceDeviceContexts(&device_contexts_, places, + /*disable_setting_default_stream_for_allocator=*/false); +} + CPUDeviceContext::CPUDeviceContext() : phi::CPUContext() { phi::CPUContext::Init(); } @@ -556,10 +592,6 @@ CUDAContext::~CUDAContext() { CUDADeviceContext::CUDADeviceContext(CUDAPlace place) : phi::GPUContext(place) { phi::GPUContext::PartialInitWithoutAllocator(); cuda_stream_.reset(new stream::CUDAStream(phi::GPUContext::stream(), place)); - auto& instance = memory::allocation::AllocatorFacade::Instance(); - instance.SetDefaultStream(place, phi::GPUContext::stream()); - workspace_.reset(new phi::DnnWorkspaceHandle( - instance.GetAllocator(place).get(), stream())); } CUDADeviceContext::~CUDADeviceContext() = default; diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index 1855f43f9d6..9ba9307d289 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -645,7 +645,6 @@ class CUDADeviceContext : public phi::GPUContext { // NOTE: Just for compatibility with the past, please delete if there is an // elegant way. std::unique_ptr cuda_stream_; - std::unique_ptr workspace_{nullptr}; DISABLE_COPY_AND_ASSIGN(CUDADeviceContext); }; @@ -883,11 +882,15 @@ struct DefaultDeviceContextType { }; #endif +void EmplaceDeviceContexts( + std::map>>* + place_to_device_context, + const std::vector& places, + bool disable_setting_default_stream_for_allocator); + /*! \brief device context pool singleton */ class DeviceContextPool { public: - explicit DeviceContextPool(const std::vector& places); - static DeviceContextPool& Instance() { PADDLE_ENFORCE_NOT_NULL(pool, platform::errors::PreconditionNotMet( @@ -925,6 +928,8 @@ class DeviceContextPool { std::shared_future>>*); private: + explicit DeviceContextPool(const std::vector& places); + static DeviceContextPool* pool; std::map>> device_contexts_; -- GitLab