diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index 8b5c3c179878090e450d7aa7eeecc6b67b1b3c72..ca1ebe18b44d262c4c3161727adf037c149035ea 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -39,9 +39,11 @@ FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( local_exec_scopes_(local_exec_scopes), places_(places), graph_(graph), - fetch_ctxs_(places), // add one more thread for generate op_deps prepare_pool_(1) { + platform::EmplaceDeviceContexts( + &fetch_ctxs_, places, + /*disable_setting_default_stream_for_allocator=*/true); if (ir::IsTopologySortOperationsUnique(*graph_)) { VLOG(10) << "Change thread number to 1 because the toposort order is unique"; @@ -144,7 +146,7 @@ FetchResultType FastThreadedSSAGraphExecutor::Run( ClearFetchOp(graph_, &fetch_ops); for (auto &place : places_) { - fetch_ctxs_.Get(place)->Wait(); + fetch_ctxs_[place].get().get()->Wait(); } } @@ -195,7 +197,7 @@ void FastThreadedSSAGraphExecutor::InsertFetchOps( fetch_ops->emplace_back(op); for (auto &p : places_) { - op->SetDeviceContext(p, fetch_ctxs_.Get(p)); + op->SetDeviceContext(p, fetch_ctxs_[p].get().get()); } for (auto *var : vars) { diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index 19b0061571596848568e90cec9f8fbb459ec6603..f535a888b4e3691d0a3135b0f7234a84d981398d 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -54,7 +54,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_map op_deps_; std::vector bootstrap_ops_; - platform::DeviceContextPool fetch_ctxs_; + std::map>> + fetch_ctxs_; std::atomic remaining_; std::future< diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 39683c9a0d868061cb63df6ab11362b15b5a8742..ef9b309c8d80f1299365083aa65e1b643f20fd2f 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -32,11 +32,14 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( local_scopes_(local_scopes), local_exec_scopes_(local_exec_scopes), places_(places), - fetch_ctxs_(places), strategy_(strategy), prepare_pool_(1), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr) { + platform::EmplaceDeviceContexts( + &fetch_ctxs_, places, + /*disable_setting_default_stream_for_allocator=*/true); + if (strategy_.num_iteration_per_run_ > 1) { int read_op_num = 0; for (auto *node : graph_->Nodes()) { @@ -207,7 +210,7 @@ void ThreadedSSAGraphExecutor::InsertFetchOps( fetch_ops->emplace_back(op); for (auto &p : places_) { - op->SetDeviceContext(p, fetch_ctxs_.Get(p)); + op->SetDeviceContext(p, fetch_ctxs_[p].get().get()); } for (auto *var : vars) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 45fa3adbf14080317fe004a7113b58d34145447d..c9a2a7eccdcde3fc1fbc9d7a0be5650567870c56 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -77,7 +77,9 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector local_exec_scopes_; std::vector places_; - platform::DeviceContextPool fetch_ctxs_; + std::map>> + fetch_ctxs_; + ExceptionHolder exception_holder_; std::unique_ptr op_deps_; std::future> op_deps_futures_; diff --git a/paddle/fluid/framework/new_executor/stream_analyzer.cc b/paddle/fluid/framework/new_executor/stream_analyzer.cc index 6c689c8548b90244e6ba667c227aa194b44ada1b..469876b01f654b6b82b7fb120650f8a4137afb55 100644 --- a/paddle/fluid/framework/new_executor/stream_analyzer.cc +++ b/paddle/fluid/framework/new_executor/stream_analyzer.cc @@ -14,11 +14,31 @@ #include "paddle/fluid/framework/new_executor/stream_analyzer.h" +#include #include +#include "paddle/fluid/platform/device_context.h" + namespace paddle { namespace framework { +StreamAnalyzer::StreamAnalyzer(const platform::Place& place) : place_(place) { + if (platform::is_gpu_place(place)) { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + platform::EmplaceDeviceContexts( + &d2h_ctxs_, {place}, + /*disable_setting_default_stream_for_allocator=*/true); + platform::EmplaceDeviceContexts( + &h2d_ctxs_, {place}, + /*disable_setting_default_stream_for_allocator=*/true); +#else + PADDLE_THROW( + platform::errors::Unimplemented("CUDAPlace is not supported. Please " + "re-compile with WITH_GPU option.")); +#endif + } +} + /* * Parse the var_ids that need to be associated with an event. * The caller should guarantee front_op and back_op satisfy the @@ -137,10 +157,10 @@ platform::DeviceContext* StreamAnalyzer::ParseDeviceContext( auto* dev_ctx = op_func_node.dev_ctx_; if (op_type == interpreter::kMemcpyD2H) { VLOG(3) << "Get dev_ctx from d2h_context_pool_"; - dev_ctx = d2h_ctx_pool_.Get(place_); + dev_ctx = d2h_ctxs_[place_].get().get(); } else if (op_type == interpreter::kMemcpyH2D) { VLOG(3) << "Get dev_ctx from h2d_context_pool_"; - dev_ctx = h2d_ctx_pool_.Get(place_); + dev_ctx = h2d_ctxs_[place_].get().get(); } return dev_ctx; diff --git a/paddle/fluid/framework/new_executor/stream_analyzer.h b/paddle/fluid/framework/new_executor/stream_analyzer.h index 8a6552c6883c5bd9916d5e2c8b1685e4a5c8aac5..c57bab9c9c2d0fe1c1e4e68f48f19a9e93e49312 100644 --- a/paddle/fluid/framework/new_executor/stream_analyzer.h +++ b/paddle/fluid/framework/new_executor/stream_analyzer.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include @@ -25,15 +26,17 @@ namespace framework { class StreamAnalyzer { public: - explicit StreamAnalyzer(const platform::Place& place) - : place_(place), d2h_ctx_pool_({place}), h2d_ctx_pool_({place}) {} + using Place = platform::Place; + using DeviceContext = platform::DeviceContext; + + explicit StreamAnalyzer(const Place& place); ~StreamAnalyzer() {} void Schedule(const std::vector& downstream_ops, std::vector* instructions, size_t op_index); - platform::DeviceContext* ParseDeviceContext(const OpFuncNode& op_func_node); + DeviceContext* ParseDeviceContext(const OpFuncNode& op_func_node); private: std::vector GetNeedEventVarIds(const Instruction& cur_instr, @@ -42,16 +45,16 @@ class StreamAnalyzer { void ConstructEventForVar(const std::vector& new_event_var_id, Instruction* next_instr, platform::DeviceType waiter_type, - const platform::Place& place); + const Place& place); bool IsDirectRun(Instruction& cur_instr, // NOLINT const Instruction& next_instr); platform::DeviceType GetWaiterType(const Instruction& instr); - platform::Place place_; - platform::DeviceContextPool d2h_ctx_pool_; - platform::DeviceContextPool h2d_ctx_pool_; + Place place_; + std::map>> d2h_ctxs_; + std::map>> h2d_ctxs_; std::map> var_id2event_; }; diff --git a/paddle/fluid/memory/allocation/allocator_facade.cc b/paddle/fluid/memory/allocation/allocator_facade.cc index d72af70657a29faad1ae905266cf8b47d9867f4b..13536be5b40fe3d325c727a3f507b154dae19de1 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.cc +++ b/paddle/fluid/memory/allocation/allocator_facade.cc @@ -419,21 +419,12 @@ class AllocatorFacadePrivate { const std::shared_ptr& allocator = GetDefaultStreamSafeCUDAAllocator(place); - // NOTE(Ruibiao): The default stream will be set when the CUDADeviceContext - // created. Normally, the DeviceContextPool is a global singleton and one - // Place only correspond to one DeviceContext. However, to support - // multi-stream scheduling, standalone executor creates two extra - // DeviceContextPools for H2D and D2H stream in StreamAnalyzer, which make - // one Place correspond to multiple DeviceContext and unexpectedly reset the - // default stream in runtime. To avoid this behavior, we do not allow - // changing default stream after initially setting. - if (allocator->GetDefaultStream() != nullptr) { - VLOG(5) << "The default stream for StreamSafeCUDAAllocator(" - << allocator.get() << ") in " << place << " has been set to " - << allocator->GetDefaultStream() - << " before, not allow to change now."; - return; - } + PADDLE_ENFORCE_EQ( + allocator->GetDefaultStream(), nullptr, + platform::errors::Unavailable( + "The default stream for StreamSafeCUDAAllocator(%p) in %s has been " + "set to %p, not allow to change it to %p.", + allocator.get(), place, allocator->GetDefaultStream(), stream)); allocator->SetDefaultStream(stream); VLOG(8) << "Set default stream to " << stream diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index d990aab57736d971e013acd6cf77acd3015d7e29..8928f5417d1a52a7406b31289d9dc39e3e344de0 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -11,13 +11,21 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include "paddle/fluid/platform/device_context.h" #include #include #include +#include "glog/logging.h" +#include "paddle/fluid/framework/expect.h" +#include "paddle/fluid/framework/generator.h" +#include "paddle/fluid/memory/allocation/allocator_facade.h" +#include "paddle/fluid/platform/device/device_wrapper.h" #include "paddle/fluid/platform/place.h" +#include "paddle/fluid/platform/profiler.h" +#include "paddle/fluid/platform/profiler/event_tracing.h" #include "paddle/fluid/platform/stream/cuda_stream.h" #include "paddle/phi/backends/gpu/gpu_context.h" #include "paddle/phi/core/allocator.h" @@ -26,17 +34,11 @@ limitations under the License. */ #include "paddle/fluid/memory/allocation/cuda_device_context_allocator.h" #include "paddle/fluid/platform/cuda_device_guard.h" #endif + #ifdef PADDLE_WITH_MLU #include "paddle/fluid/platform/device/mlu/device_context.h" #include "paddle/fluid/platform/device/mlu/device_context_allocator.h" #endif -#include "glog/logging.h" -#include "paddle/fluid/framework/expect.h" -#include "paddle/fluid/framework/generator.h" -#include "paddle/fluid/memory/allocation/allocator_facade.h" -#include "paddle/fluid/platform/device/device_wrapper.h" -#include "paddle/fluid/platform/profiler.h" -#include "paddle/fluid/platform/profiler/event_tracing.h" namespace paddle { namespace memory { @@ -178,75 +180,89 @@ void DeviceContextPool::SetDeviceContexts( } template -inline void EmplaceDeviceContext( - std::map>>* - map_ptr, - platform::Place p) { +std::unique_ptr CreateDeviceContext( + const platform::Place& p, + bool disable_setting_default_stream_for_allocator = false) { using PtrType = std::unique_ptr; - map_ptr->emplace( - p, std::async(std::launch::deferred, [=] { - // lazy evaluation. i.e., only create device context at - // first `Get` - auto* dev_ctx = new DevCtx(p); - if (is_gpu_place(p)) { + auto* dev_ctx = new DevCtx(p); + if (is_gpu_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - auto* cuda_ctx = dynamic_cast(dev_ctx); - PADDLE_ENFORCE_NOT_NULL( - cuda_ctx, - platform::errors::InvalidArgument( - "Failed to dynamic_cast dev_ctx into CUDADeviceContext.")); - dev_ctx->SetAllocator(memory::allocation::AllocatorFacade::Instance() - .GetAllocator(p) - .get()); - dev_ctx->SetPinnedAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetAllocator(paddle::platform::CUDAPinnedPlace()) - .get()); - - cuda_ctx->PartialInitWithAllocator(); - dev_ctx->SetGenerator( - framework::DefaultCUDAGenerator(p.GetDeviceId()).get()); -#endif - } else { - dev_ctx->SetAllocator(memory::allocation::AllocatorFacade::Instance() - .GetAllocator(p) - .get()); - dev_ctx->SetGenerator(framework::DefaultCPUGenerator().get()); - } - dev_ctx->SetHostGenerator(framework::DefaultCPUGenerator().get()); - dev_ctx->SetHostAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetAllocator(platform::CPUPlace()) - .get()); - dev_ctx->SetZeroAllocator( - memory::allocation::AllocatorFacade::Instance() - .GetZeroAllocator(p) - .get()); - return PtrType(dev_ctx); - })); + auto* cuda_ctx = dynamic_cast(dev_ctx); + PADDLE_ENFORCE_NOT_NULL( + cuda_ctx, + platform::errors::InvalidArgument( + "Failed to dynamic_cast dev_ctx into CUDADeviceContext.")); + + auto& instance = memory::allocation::AllocatorFacade::Instance(); + if (!disable_setting_default_stream_for_allocator) { + instance.SetDefaultStream(CUDAPlace(p.GetDeviceId()), cuda_ctx->stream()); + } + dev_ctx->SetAllocator(instance.GetAllocator(p).get()); + dev_ctx->SetPinnedAllocator( + instance.GetAllocator(paddle::platform::CUDAPinnedPlace()).get()); + + cuda_ctx->PartialInitWithAllocator(); + dev_ctx->SetGenerator( + framework::DefaultCUDAGenerator(p.GetDeviceId()).get()); +#endif + } else { + dev_ctx->SetAllocator( + memory::allocation::AllocatorFacade::Instance().GetAllocator(p).get()); + dev_ctx->SetGenerator(framework::DefaultCPUGenerator().get()); + } + dev_ctx->SetHostGenerator(framework::DefaultCPUGenerator().get()); + dev_ctx->SetHostAllocator(memory::allocation::AllocatorFacade::Instance() + .GetAllocator(platform::CPUPlace()) + .get()); + dev_ctx->SetZeroAllocator(memory::allocation::AllocatorFacade::Instance() + .GetZeroAllocator(p) + .get()); + return PtrType(dev_ctx); } -DeviceContextPool::DeviceContextPool( - const std::vector& places) { +template +inline void EmplaceDeviceContext( + std::map>>* + place_to_device_context, + platform::Place place, bool disable_setting_default_stream_for_allocator) { + // lazy evaluation. i.e., only create device context at first `Get` + place_to_device_context->emplace( + place, std::async(std::launch::deferred, CreateDeviceContext, + place, disable_setting_default_stream_for_allocator)); +} + +void EmplaceDeviceContexts( + std::map>>* + place_to_device_context, + const std::vector& places, + bool disable_setting_default_stream_for_allocator) { PADDLE_ENFORCE_GT( places.size(), 0, platform::errors::InvalidArgument("The number of platform places should " "be larger than 0. But received %d.", places.size())); + std::set set; for (auto& p : places) { set.insert(p); } + for (auto& p : set) { if (platform::is_cpu_place(p)) { #ifdef PADDLE_WITH_MKLDNN - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #endif } else if (platform::is_gpu_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("CUDAPlace is not supported. Please " @@ -254,7 +270,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_cuda_pinned_place(p)) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "CUDAPlace is not supported. Please re-compile with WITH_GPU " @@ -262,7 +280,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_xpu_place(p)) { #ifdef PADDLE_WITH_XPU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("XPUPlace is not supported. Please " @@ -270,7 +290,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_mlu_place(p)) { #ifdef PADDLE_WITH_MLU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("MLUPlace is not supported. Please " @@ -278,7 +300,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_ipu_place(p)) { #ifdef PADDLE_WITH_IPU - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW( platform::errors::Unimplemented("IPUPlace is not supported. Please " @@ -286,7 +310,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_npu_place(p)) { #ifdef PADDLE_WITH_ASCEND_CL - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "NPUPlace is not supported. Please " @@ -294,7 +320,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_npu_pinned_place(p)) { #ifdef PADDLE_WITH_ASCEND_CL - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "NPUPinnedPlace is not supported. Please re-compile with " @@ -303,7 +331,9 @@ DeviceContextPool::DeviceContextPool( #endif } else if (platform::is_custom_place(p)) { #ifdef PADDLE_WITH_CUSTOM_DEVICE - EmplaceDeviceContext(&device_contexts_, p); + EmplaceDeviceContext( + place_to_device_context, p, + disable_setting_default_stream_for_allocator); #else PADDLE_THROW(platform::errors::Unimplemented( "CustomPlace is not supported. Please re-compile with " @@ -314,6 +344,12 @@ DeviceContextPool::DeviceContextPool( } } +DeviceContextPool::DeviceContextPool( + const std::vector& places) { + EmplaceDeviceContexts(&device_contexts_, places, + /*disable_setting_default_stream_for_allocator=*/false); +} + CPUDeviceContext::CPUDeviceContext() : phi::CPUContext() { phi::CPUContext::Init(); } @@ -556,10 +592,6 @@ CUDAContext::~CUDAContext() { CUDADeviceContext::CUDADeviceContext(CUDAPlace place) : phi::GPUContext(place) { phi::GPUContext::PartialInitWithoutAllocator(); cuda_stream_.reset(new stream::CUDAStream(phi::GPUContext::stream(), place)); - auto& instance = memory::allocation::AllocatorFacade::Instance(); - instance.SetDefaultStream(place, phi::GPUContext::stream()); - workspace_.reset(new phi::DnnWorkspaceHandle( - instance.GetAllocator(place).get(), stream())); } CUDADeviceContext::~CUDADeviceContext() = default; diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index 1855f43f9d6cf95154fe4cd87072a05b04b73214..9ba9307d289ebaef1f06e6a21150c4223e7f861a 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -645,7 +645,6 @@ class CUDADeviceContext : public phi::GPUContext { // NOTE: Just for compatibility with the past, please delete if there is an // elegant way. std::unique_ptr cuda_stream_; - std::unique_ptr workspace_{nullptr}; DISABLE_COPY_AND_ASSIGN(CUDADeviceContext); }; @@ -883,11 +882,15 @@ struct DefaultDeviceContextType { }; #endif +void EmplaceDeviceContexts( + std::map>>* + place_to_device_context, + const std::vector& places, + bool disable_setting_default_stream_for_allocator); + /*! \brief device context pool singleton */ class DeviceContextPool { public: - explicit DeviceContextPool(const std::vector& places); - static DeviceContextPool& Instance() { PADDLE_ENFORCE_NOT_NULL(pool, platform::errors::PreconditionNotMet( @@ -925,6 +928,8 @@ class DeviceContextPool { std::shared_future>>*); private: + explicit DeviceContextPool(const std::vector& places); + static DeviceContextPool* pool; std::map>> device_contexts_;