diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index 3a1ce59fba995d0943f2cf134cd56e1ec109ef94..e268bce87acf11149485796f0121ece4a565a5b3 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -1,18 +1,34 @@ set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method -graph_to_program_pass variable_helper timer monitor nan_inf_utils) +graph_to_program_pass variable_helper timer monitor nan_inf_utils interpretercore_event_garbage_collector) + +if(WITH_GPU) +list(APPEND INTERPRETERCORE_DEPS interpretercore_fast_garbage_collector) +endif() add_subdirectory(workqueue) cc_library(data_transfer SRCS data_transfer.cc DEPS enforce scope glog) cc_library(new_executor_defs SRCS new_executor_defs.cc DEPS enforce glog scope) -cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS} executor_gc_helper) +cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS garbage_collector) +cc_library(interpretercore_event_garbage_collector SRCS interpretercore_event_garbage_collector.cc DEPS interpretercore_garbage_collector) cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue new_executor_defs data_transfer) cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog new_executor_defs) cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context new_executor_defs) -cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager) +cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_event_garbage_collector stream_analyzer event_manager) cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore) +if(WITH_GPU OR WITH_ROCM) + if(WITH_GPU) + nv_library(interpretercore_fast_garbage_collector SRCS interpretercore_fast_garbage_collector.cc DEPS interpretercore_garbage_collector) + elseif(WITH_ROCM) + hip_library(interpretercore_fast_garbage_collector SRCS interpretercore_fast_garbage_collector.cc DEPS interpretercore_garbage_collector) + endif() + + target_link_libraries(interpretercore interpretercore_fast_garbage_collector) +endif() + +# cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler) # skip win32 since wget is not installed by default on windows machine. # skip COVERAGE_CI since the test runs slowly because of instrumentation. if (WITH_TESTING AND NOT WIN32 AND NOT WITH_COVERAGE AND NOT "$ENV{CI_SKIP_CPP_TEST}" STREQUAL "ON") diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 94a27294e8948e17f8ba630ab3097f83239344cf..9d8e4867841f8c5cceced92fb65f1f0a59436503 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -16,10 +16,15 @@ #include #include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/share_tensor_buffer_functor.h" +#include "paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/platform/profiler.h" +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) +#include "paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h" +#endif + PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true, "Use inplace in new executor"); PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true, @@ -28,6 +33,8 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true, DECLARE_bool(check_nan_inf); DECLARE_bool(benchmark); +DECLARE_bool(fast_eager_deletion_mode); +DECLARE_bool(use_stream_safe_cuda_allocator); constexpr const char* kExceptionCaught = "ExceptionCaught"; constexpr const char* kTaskCompletion = "TaskCompletion"; @@ -37,6 +44,10 @@ namespace framework { // NOTE(Aurelius84): Need a better strategy to determine it. static constexpr size_t kHostNumThreads = 4; +bool IsInterpretercoreFastGCEnabled() { + return FLAGS_fast_eager_deletion_mode && FLAGS_use_stream_safe_cuda_allocator; +} + InterpreterCore::InterpreterCore(const platform::Place& place, const BlockDesc& block, VariableScope* global_scope) @@ -47,7 +58,16 @@ InterpreterCore::InterpreterCore(const platform::Place& place, is_build_ = false; async_work_queue_.reset( new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_)); - gc_.reset(new InterpreterCoreGarbageCollector()); + +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + if (IsInterpretercoreFastGCEnabled()) { + gc_ = std::make_unique(); + } else { + gc_ = std::make_unique(); + } +#else + gc_ = std::make_unique(); +#endif exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught); completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion); @@ -513,7 +533,10 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { try { RunInstruction(instr_node); - // GC infomation + +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + RecordStreamForGC(instr_node); +#endif CheckGC(instr_node); } catch (platform::EnforceNotMet& ex) { framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex); @@ -552,6 +575,99 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { } } +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) +void InterpreterCore::RecordStreamForGC(const Instruction& instr) { + if (!IsInterpretercoreFastGCEnabled() || + instr.KernelType() != OpFuncType::kQueueAsync) { + return; + } + + gpuStream_t stream = reinterpret_cast( + instr.DeviceContext()) + .stream(); + auto TensorRecordStream = [&stream](Tensor& tensor) { + auto allocation = tensor.Holder(); + if (allocation == nullptr) { + return; + } + + const platform::Place& place = allocation->place(); + if (platform::is_gpu_place(place)) { + memory::RecordStream(allocation, stream); + } else if (platform::is_cuda_pinned_place(place)) { + // TODO(Ruibiao): Here should do something to make sure that the tensor is + // not freed until the H2D copies done. However, simplely launch a CUDA + // runtime callback to the H2D stream may lead a high performance + // overhead. As all the cases we meet in H2D are copies from CPUPlace at + // present, we just log a WARNING here. A better design is required. + LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous " + "manner may lead a data inconsistent"; + } else { + // memory copies involve CPUPlace are always synchronous, so just do + // nothing here + } + }; + + /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when + * all the following conditions are satisfied: + * 1. The tensor will be GC after running the instruction, i.e., in + * instr.GCCheckVars. + * 2. The stream which initializes this tensor is different from the stream + * which the instruction run in. + * 3. The tensor is the instruction's input, cause we assume that instruction + * will initialize all output tensors with its running stream. + * 4. In the OP function of this instruction, the tensor is an input of a + * async CUDA kernel. + * + * Here we only process the first condition, because: + * 1. Since the RecordStream function will directly return when the recored + * stream is equal to the owning stream, recording a stream same as which + * initialized this tensor has less time overhead. Conversely, it may take + * more time if we try to extract those cross-stream input vars from + * instr.GCCheckVars. + * 2. Now the instruction has no idea of which vars involving async running in + * OP function, and thus we can not recognize condition 4. It should be + * supported later. + */ + for (int var_id : instr.GCCheckVars()) { + VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " " + << global_scope_->VarDesc(var_id); + + // persistable var will be ignore while GC + if (global_scope_->VarDesc(var_id) && + global_scope_->VarDesc(var_id)->Persistable()) { + continue; + } + + paddle::framework::Variable* var = global_scope_->Var(var_id); + if (var == nullptr) { + continue; + } + + if (var->IsType()) { + TensorRecordStream(*(var->GetMutable())); + } else if (var->IsType< + operators::reader:: + OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { + // do nothing + } else if (var->IsType()) { + TensorRecordStream(*(var->GetMutable()->mutable_value())); + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& tensor : *tensor_arr) { + TensorRecordStream(tensor); + } + } else if (var->IsType>()) { + // do nothing + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } + } +} +#endif + void InterpreterCore::CheckGC(const Instruction& instr) { size_t instr_id = instr.Id(); auto& var_scope = *global_scope_; @@ -570,8 +686,21 @@ void InterpreterCore::CheckGC(const Instruction& instr) { if (is_ready) { VLOG(6) << "Async delete variable with name : " << var_scope.GetNameById(var_id); - gc_->Add(var_scope.Var(var_id), gc_event_.at(instr_id), - &instr.DeviceContext()); +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + if (IsInterpretercoreFastGCEnabled()) { + static_cast(gc_.get())->Add( + var_scope.Var(var_id)); + + } else { + static_cast(gc_.get())->Add( + var_scope.Var(var_id), gc_event_.at(instr_id), + &instr.DeviceContext()); + } +#else + static_cast(gc_.get())->Add( + var_scope.Var(var_id), gc_event_.at(instr_id), + &instr.DeviceContext()); +#endif } } } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index e6e6b7cdc3a1e15852010dccd5b3f381144cc879..277093c082fd92857be1dfb7016f6f2ce8c6d353 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -72,6 +72,10 @@ class InterpreterCore { const std::vector& feed_tensors, bool prepare_feed); +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + void RecordStreamForGC(const Instruction& instr); +#endif + void CheckGC(const Instruction& instr); void RunInstructionAsync(size_t instr_id); diff --git a/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc new file mode 100644 index 0000000000000000000000000000000000000000..7beefec4487de31d2fa558153b7a0522545def72 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc @@ -0,0 +1,135 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h" + +#if !defined(_WIN32) +#include +#else +#define NOMINMAX +#include +#endif // !_WIN32 + +namespace paddle { +namespace framework { + +InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector() { + WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, + /*track_task*/ false); + queue_ = CreateSingleThreadedWorkQueue(options); +} + +InterpreterCoreEventGarbageCollector::~InterpreterCoreEventGarbageCollector() { + queue_.reset(nullptr); +} + +void InterpreterCoreEventGarbageCollector::Add( + Garbage garbage, platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + if (!garbage) { + return; + } + + if (max_memory_size_ <= 1) { + Free(garbage, event, ctx); + } else { + std::unique_ptr pending_delete_garbages; + { // lock guard + std::lock_guard guard(spinlock_); + cur_memory_size_ += garbage->size(); + garbages_->push_back(std::move(garbage)); + + if (cur_memory_size_ >= max_memory_size_) { + cur_memory_size_ = 0; + pending_delete_garbages = std::move(garbages_); + garbages_ = std::make_unique(); + } + } + } +} + +void InterpreterCoreEventGarbageCollector::Add( + Variable* var, platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) { + return; + } + + if (var->IsType()) { + Add(var->GetMutable()->MoveMemoryHolder(), event, ctx); + } else if (var->IsType< + operators::reader:: + OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { + // TODO(xiongkun03) in old executor, this type of variable is not support + // eager deletion. so we just leave it here ? + } else if (var->IsType()) { + // TODO(xiongkun03) in old executor, this type of variable is not support + // eager deletion. so we just leave it here ? + } else if (var->IsType()) { + Add(var->GetMutable()->mutable_value()->MoveMemoryHolder(), + event, ctx); + var->GetMutable()->mutable_rows()->clear(); + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& t : *tensor_arr) { + Add(t.MoveMemoryHolder(), event, ctx); + } + } else if (var->IsType>()) { + // NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE + // refer to executor.cc to see what old garbage collector does. + // do nothing, because the sub scope will be deleted by sub-executor. + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } +} + +void InterpreterCoreEventGarbageCollector::Free( + GarbageQueue* garbages, platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + event.Record(ctx); + event.SetFininshed(); // Only for CPU Event + queue_->AddTask([ container = garbages, event = &event ]() { + while (!event->Query()) { +#if defined(_WIN32) + SleepEx(50, FALSE); +#else + sched_yield(); +#endif + continue; + } + delete container; + }); +} + +void InterpreterCoreEventGarbageCollector::Free( + Garbage& garbage, platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + event.Record(ctx); + event.SetFininshed(); // Only for CPU Event + queue_->AddTask([ container = garbage, event = &event ]() { + while (!event->Query()) { +#if defined(_WIN32) + SleepEx(50, FALSE); +#else + sched_yield(); +#endif + continue; + } + }); +} + +} // namespace framework +} // namespace paddle \ No newline at end of file diff --git a/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h new file mode 100644 index 0000000000000000000000000000000000000000..ab329f196da34661c3693a9784eaac1055f09034 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h @@ -0,0 +1,44 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h" +#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h" + +namespace paddle { +namespace framework { + +class InterpreterCoreEventGarbageCollector + : public InterpreterCoreGarbageCollector { + public: + InterpreterCoreEventGarbageCollector(); + ~InterpreterCoreEventGarbageCollector(); + + virtual void Add(Variable* var, platform::DeviceEvent& event, + const platform::DeviceContext* ctx) override; + + private: + void Add(Garbage garbage, platform::DeviceEvent& event, + const platform::DeviceContext* ctx); + void Free(GarbageQueue* garbages, platform::DeviceEvent& event, + const platform::DeviceContext* ctx); + void Free(Garbage& garbage, platform::DeviceEvent& event, + const platform::DeviceContext* ctx); + + std::unique_ptr queue_; + paddle::memory::SpinLock spinlock_; +}; +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.cc new file mode 100644 index 0000000000000000000000000000000000000000..784cfca943ea1d88546e5d024bbdeaece2c55849 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.cc @@ -0,0 +1,76 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h" + +namespace paddle { +namespace framework { + +void InterpreterCoreFastGarbageCollector::Add(Variable* var) { + if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) { + return; + } + + if (var->IsType()) { + Add(var->GetMutable()->MoveMemoryHolder()); + } else if (var->IsType< + operators::reader:: + OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { + // TODO(xiongkun03) in old executor, this type of variable is not support + // eager deletion. so we just leave it here ? + } else if (var->IsType()) { + // TODO(xiongkun03) in old executor, this type of variable is not support + // eager deletion. so we just leave it here ? + } else if (var->IsType()) { + Add(var->GetMutable()->mutable_value()->MoveMemoryHolder()); + var->GetMutable()->mutable_rows()->clear(); + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& t : *tensor_arr) { + Add(t.MoveMemoryHolder()); + } + } else if (var->IsType>()) { + // NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE + // refer to executor.cc to see what old garbage collector does. + // do nothing, because the sub scope will be deleted by sub-executor. + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } +} + +void InterpreterCoreFastGarbageCollector::Add(Garbage garbage) { + if (!garbage) { + return; + } + + if (max_memory_size_ > 1) { + std::unique_ptr pending_delete_garbages; + { // lock guard + std::lock_guard guard(spinlock_); + cur_memory_size_ += garbage->size(); + garbages_->push_back(std::move(garbage)); + + if (cur_memory_size_ >= max_memory_size_) { + cur_memory_size_ = 0; + pending_delete_garbages = std::move(garbages_); + garbages_ = std::make_unique(); + } + } + } +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h b/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h new file mode 100644 index 0000000000000000000000000000000000000000..ad19db049468f2e88ee11d7340e09fbd8dc42161 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h @@ -0,0 +1,30 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h" + +namespace paddle { +namespace framework { + +class InterpreterCoreFastGarbageCollector + : public InterpreterCoreGarbageCollector { + public: + virtual void Add(Variable* var) override; + + private: + void Add(Garbage garbage); +}; +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc index 40537815b48bfc4b4314ed71f4298a9327d369d4..9345546e65f99d486d628997039288433dabd0a7 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc @@ -19,114 +19,23 @@ namespace paddle { namespace framework { InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() { - garbages_.reset(new GarbageQueue()); - max_memory_size_ = static_cast(GetEagerDeletionThreshold()); + garbages_ = std::make_unique(); + max_memory_size_ = static_cast(GetEagerDeletionThreshold()); cur_memory_size_ = 0; - - WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, - /*track_task*/ false); - queue_ = CreateSingleThreadedWorkQueue(options); } -InterpreterCoreGarbageCollector::~InterpreterCoreGarbageCollector() { - queue_.reset(nullptr); +void InterpreterCoreGarbageCollector::Add(Variable* var) { + PADDLE_THROW( + platform::errors::Unimplemented("Not allowed to call the member function " + "of InterpreterCoreGarbageCollector")); } -void InterpreterCoreGarbageCollector::Add( - std::shared_ptr garbage, - paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) { - if (max_memory_size_ <= 1) { - Free(garbage, event, ctx); - } else { - if (!garbage) return; - GarbageQueue* garbage_ptr = nullptr; - { - std::lock_guard guard(spinlock_); - cur_memory_size_ += garbage->size(); - garbages_->push_back(std::move(garbage)); - - if (cur_memory_size_ >= max_memory_size_) { - cur_memory_size_ = 0; - garbage_ptr = garbages_.release(); - garbages_.reset(new GarbageQueue()); - } - } - if (garbage_ptr) { - Free(garbage_ptr, event, ctx); - } - } -} - -void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var, - paddle::platform::DeviceEvent& event, +void InterpreterCoreGarbageCollector::Add(Variable* var, + platform::DeviceEvent& event, const platform::DeviceContext* ctx) { - if (!var) { - return; - } - - if (var->IsType()) { - Add(var->GetMutable()->MoveMemoryHolder(), event, ctx); - } else if (var->IsType< - operators::reader:: - OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { - // TODO(xiongkun03) in old executor, this type of variable is not support - // eager deletion. so we just leave it here ? - } else if (var->IsType()) { - // TODO(xiongkun03) in old executor, this type of variable is not support - // eager deletion. so we just leave it here ? - } else if (var->IsType()) { - Add(var->GetMutable()->mutable_value()->MoveMemoryHolder(), - event, ctx); - var->GetMutable()->mutable_rows()->clear(); - } else if (var->IsType()) { - auto* tensor_arr = var->GetMutable(); - for (auto& t : *tensor_arr) { - Add(t.MoveMemoryHolder(), event, ctx); - } - } else if (var->IsType>()) { - // NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE - // refer to executor.cc to see what old garbage collector does. - // do nothing, because the sub scope will be deleted by sub-executor. - } else { - PADDLE_THROW(platform::errors::Unimplemented( - "The variable(%s) is not supported in eager deletion.", - framework::ToTypeName(var->Type()))); - } -} - -void InterpreterCoreGarbageCollector::Free(GarbageQueue* garbages, - paddle::platform::DeviceEvent& event, - const platform::DeviceContext* ctx) { - event.Record(ctx); - event.SetFininshed(); // Only for CPU Event - queue_->AddTask([ container = garbages, event = &event ]() { - while (!event->Query()) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif - continue; - } - delete container; - }); -} - -void InterpreterCoreGarbageCollector::Free( - std::shared_ptr& garbage, - paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) { - event.Record(ctx); - event.SetFininshed(); // Only for CPU Event - queue_->AddTask([ container = garbage, event = &event ]() { - while (!event->Query()) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif - continue; - } - }); + PADDLE_THROW( + platform::errors::Unimplemented("Not allowed to call the member function " + "of InterpreterCoreGarbageCollector")); } } // namespace framework diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h index ffb22092701b8af9e476a6ee67acba3292ea51e9..5a0554d577affc1bbca42e2b1113c7ff1d270351 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h @@ -13,54 +13,31 @@ // limitations under the License. #pragma once -#if !defined(_WIN32) -#include -#else -#define NOMINMAX -#include -#endif // !_WIN32 - #include -#include - -#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h" #include "paddle/fluid/memory/allocation/spin_lock.h" #include "paddle/fluid/platform/device_event.h" namespace paddle { namespace framework { -using GarbageQueue = std::deque>; +using Garbage = std::shared_ptr; +using GarbageQueue = std::deque; + class InterpreterCoreGarbageCollector { public: InterpreterCoreGarbageCollector(); - - ~InterpreterCoreGarbageCollector(); - - void Add(std::shared_ptr garbage, // NOLINT - paddle::platform::DeviceEvent& event, // NOLINT - const platform::DeviceContext* ctx); - - void Add(paddle::framework::Variable* var, - paddle::platform::DeviceEvent& event, // NOLINT - const platform::DeviceContext* ctx); - + virtual ~InterpreterCoreGarbageCollector(){}; + virtual void Add(Variable* var); + virtual void Add(Variable* var, platform::DeviceEvent& event, + const platform::DeviceContext* ctx); DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector); - private: - void Free(GarbageQueue* garbages, - paddle::platform::DeviceEvent& event, // NOLINT - const platform::DeviceContext* ctx); - - void Free(std::shared_ptr& garbage, // NOLINT - paddle::platform::DeviceEvent& event, // NOLINT - const platform::DeviceContext* ctx); - + protected: std::unique_ptr garbages_; - size_t max_memory_size_; - size_t cur_memory_size_; - std::unique_ptr queue_; - paddle::memory::SpinLock spinlock_; + int64_t max_memory_size_; + int64_t cur_memory_size_; + memory::SpinLock spinlock_; }; + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/tensor.cc b/paddle/fluid/framework/tensor.cc index e59733b4fbe9a0aeeacaccf23f02809db391dbbd..2ccd295577039d259ca346ff93ddcc7f87b10c16 100644 --- a/paddle/fluid/framework/tensor.cc +++ b/paddle/fluid/framework/tensor.cc @@ -98,7 +98,9 @@ void* Tensor::mutable_data(const platform::Place& place, /* some versions of boost::variant don't have operator!= */ if (holder_ == nullptr || !(holder_->place() == place) || - holder_->size() < size + offset_) { + holder_->size() < size + offset_ || + !(platform::is_gpu_place(place) && + memory::InSameStream(holder_, stream))) { holder_.reset(); holder_ = memory::AllocShared(place, size, stream); offset_ = 0; diff --git a/paddle/fluid/memory/allocation/allocator_facade.cc b/paddle/fluid/memory/allocation/allocator_facade.cc index 3a53954436c89e2810ff77231e5ead89877336b2..473a2d28877a66d2d97b0f205ece35044e6e1b08 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.cc +++ b/paddle/fluid/memory/allocation/allocator_facade.cc @@ -26,6 +26,7 @@ #include "paddle/fluid/platform/place.h" #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) +#include #include "paddle/fluid/memory/allocation/cuda_allocator.h" #include "paddle/fluid/memory/allocation/pinned_allocator.h" #include "paddle/fluid/memory/allocation/stream_safe_cuda_allocator.h" @@ -151,11 +152,12 @@ class AllocatorFacadePrivate { } #endif #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - if (FLAGS_use_stream_safe_cuda_allocator) { - LOG(WARNING) << "FLAGS_use_stream_safe_cuda_allocator is invalid for " - "naive_best_fit strategy"; - FLAGS_use_stream_safe_cuda_allocator = false; - } + PADDLE_ENFORCE_EQ( + FLAGS_use_stream_safe_cuda_allocator, false, + paddle::platform::errors::Unimplemented( + "StreamSafeCUDAAllocator is only implemented for auto_growth " + "strategy, not support naive_best_fit strategy")); + for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount(); ++dev_id) { InitNaiveBestFitCUDAAllocator(platform::CUDAPlace(dev_id)); } @@ -185,9 +187,6 @@ class AllocatorFacadePrivate { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) allow_free_idle_chunk_ = allow_free_idle_chunk; if (FLAGS_use_stream_safe_cuda_allocator) { - default_streams_ = - std::vector(platform::GetGPUDeviceCount(), nullptr); - // TODO(Ruibiao): Support multi-stream allocator for other strategies for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount(); ++dev_id) { InitStreamSafeCUDAAllocator(platform::CUDAPlace(dev_id), nullptr); @@ -232,11 +231,11 @@ class AllocatorFacadePrivate { } #endif #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - if (FLAGS_use_stream_safe_cuda_allocator) { - LOG(WARNING) << "FLAGS_use_stream_safe_cuda_allocator is invalid for " - "thread_local strategy"; - FLAGS_use_stream_safe_cuda_allocator = false; - } + PADDLE_ENFORCE_EQ( + FLAGS_use_stream_safe_cuda_allocator, false, + paddle::platform::errors::Unimplemented( + "StreamSafeCUDAAllocator is only implemented for auto_growth " + "strategy, not support thread_local strategy")); for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount(); ++dev_id) { InitThreadLocalCUDAAllocator(platform::CUDAPlace(dev_id)); @@ -282,50 +281,45 @@ class AllocatorFacadePrivate { } #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + bool HasCUDAAllocator(const platform::CUDAPlace& place, + const gpuStream_t& stream) { + auto it = cuda_allocators_.find(place); + if (it == cuda_allocators_.end()) { + return false; + } + const std::map>& allocator_map = + it->second; + return allocator_map.find(stream) != allocator_map.end(); + } + const std::shared_ptr& GetAllocator( const platform::CUDAPlace& place, const gpuStream_t& stream, bool create_if_not_found = false) { - auto place_it = cuda_allocators_.find(place); - PADDLE_ENFORCE_NE(place_it, cuda_allocators_.end(), - platform::errors::NotFound( - "No allocator found for the place %s", place)); - - const std::map>& allocator_map = - place_it->second; - auto stream_it = allocator_map.find(stream); - if (stream_it == allocator_map.end()) { - if (create_if_not_found) { - InitStreamSafeCUDAAllocator(place, stream); + { // shared_lock_guard + std::shared_lock lock_guard( + cuda_allocator_mutex_); + if (LIKELY(HasCUDAAllocator(place, stream))) { return cuda_allocators_[place][stream]; } else { - PADDLE_THROW(platform::errors::NotFound( - "No allocator found for stream %s in place %s", stream, place)); + PADDLE_ENFORCE_NE(create_if_not_found, false, + platform::errors::NotFound( + "No allocator found for stream %s in place %s " + "with create_if_not_found = false", + stream, place)); } } - return stream_it->second; - } - const gpuStream_t& GetDefaultStream(const platform::CUDAPlace& place) { - int dev_id = place.GetDeviceId(); - gpuStream_t& default_stream = default_streams_[dev_id]; - if (UNLIKELY(default_stream == nullptr)) { - /* NOTE(Ruibiao): Here if we set default_stream by code " default_stream = - * platform::stream::get_current_stream(place.GetDeviceId())->raw_stream() - * ", then it will be fail to make target 'jit_kernel_benchmark', says a - * undefined reference to `paddle::platform::DeviceContextPool::Get( - * paddle::platform::Place const&)' in function - * `paddle::platform::stream::get_current_stream(int)'. However, target - * allocator_facade will not be affected. It seems a circular dependency - * problem between 'cuda_stream' and 'device_context' that causes this - * strange bug. - */ - platform::DeviceContextPool& pool = - platform::DeviceContextPool::Instance(); - default_stream = - static_cast(pool.Get(place))->stream(); - InitStreamSafeCUDAAllocator(place, default_stream); + { // unique_lock_guard + std::unique_lock lock_guard( + cuda_allocator_mutex_); + InitStreamSafeCUDAAllocator(place, stream); + return cuda_allocators_[place][stream]; } - return default_stream; + } + + gpuStream_t GetDefaultStream(const platform::CUDAPlace& place) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + return static_cast(pool.Get(place))->stream(); } void RecordStream(std::shared_ptr allocation, @@ -443,18 +437,12 @@ class AllocatorFacadePrivate { "Only support auto-growth strategey for StreamSafeCUDAAllocator, " "the allocator strategy %d is unsupported for multi-stream", static_cast(strategy_))); - VLOG(9) << "Init CUDA allocator for stream " << stream << " in place " << p; - std::lock_guard lock_guard(cuda_allocators_lock_); - try { - GetAllocator(p, stream); - VLOG(9) << "Other thread had build a allocator for stream " << stream - << " in place " << p; - } catch (platform::EnforceNotMet&) { + if (LIKELY(!HasCUDAAllocator(p, stream))) { + VLOG(8) << "Init CUDA allocator for stream " << stream << " in place " + << p; InitAutoGrowthCUDAAllocator(p, stream); WrapStreamSafeCUDAAllocator(p, stream); WrapCUDARetryAllocator(p, stream, FLAGS_gpu_allocator_retry_time); - } catch (...) { - throw; } } @@ -618,7 +606,7 @@ class AllocatorFacadePrivate { void WrapStreamSafeCUDAAllocator(platform::CUDAPlace p, gpuStream_t stream) { const std::shared_ptr& underlying_allocator = - GetAllocator(p, stream); + cuda_allocators_[p][stream]; cuda_allocators_[p][stream] = std::make_shared( underlying_allocator, p, stream); } @@ -629,7 +617,7 @@ class AllocatorFacadePrivate { retry_time, 0, platform::errors::InvalidArgument( "Retry time should be larger than 0, but got %d", retry_time)); - std::shared_ptr allocator = GetAllocator(p, stream); + std::shared_ptr allocator = cuda_allocators_[p][stream]; allocator = std::make_shared(allocator, retry_time); } @@ -784,8 +772,7 @@ class AllocatorFacadePrivate { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) // a standalone CUDA allocator to support multi-stream GC in new executor CUDAAllocatorMap cuda_allocators_; - std::vector default_streams_; - SpinLock cuda_allocators_lock_; + std::shared_timed_mutex cuda_allocator_mutex_; #ifdef PADDLE_WITH_CUDA std::unordered_map> cuda_graph_allocator_map_; @@ -903,6 +890,31 @@ std::shared_ptr AllocatorFacade::AllocShared( #endif } +bool AllocatorFacade::InSameStream( + const std::shared_ptr& allocation, + const platform::Stream& stream) { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + PADDLE_ENFORCE_EQ( + FLAGS_use_stream_safe_cuda_allocator, true, + platform::errors::Unimplemented( + "StreamSafeCUDAAllocator is disabled, you should not call this " + "multi-stream 'InSameStream' function. To enable it, you can enter" + "'export FLAGS_use_stream_safe_cuda_allocator=true' in the " + "terminal.")); + +#ifdef PADDLE_WITH_CUDA + if (UNLIKELY(platform::CUDAGraph::IsCapturing())) { + PADDLE_THROW(platform::errors::Unavailable( + "Not allow to use StreamSafeCUDAAllocator with CUDAGraphAllocator")); + } +#endif + gpuStream_t s = reinterpret_cast(stream.id()); + return s == GetStream(allocation); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet("Not compiled with GPU.")); +#endif +} + #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) AllocationPtr AllocatorFacade::Alloc(const platform::Place& place, size_t size, const gpuStream_t& stream) { diff --git a/paddle/fluid/memory/allocation/allocator_facade.h b/paddle/fluid/memory/allocation/allocator_facade.h index b10ea9948d4497af38f9cd172180771553c898b9..d59ecaece5a70f461b8443d06488ba6f2fe5446b 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.h +++ b/paddle/fluid/memory/allocation/allocator_facade.h @@ -61,6 +61,10 @@ class AllocatorFacade { std::shared_ptr AllocShared(const platform::Place& place, size_t size, const platform::Stream& stream); + + bool InSameStream(const std::shared_ptr& allocation, + const platform::Stream& stream); + #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) // TODO(zhiqiu): change gpuStream_t to platform::Stream if needed. AllocationPtr Alloc(const platform::Place& place, size_t size, diff --git a/paddle/fluid/memory/allocation/stream_safe_cuda_allocator.cc b/paddle/fluid/memory/allocation/stream_safe_cuda_allocator.cc index 0d0318859c6262875fcea5b08ba60c7a65233b8f..a4f766f1d1abc66ccd39d493e8c4abc591258e8d 100644 --- a/paddle/fluid/memory/allocation/stream_safe_cuda_allocator.cc +++ b/paddle/fluid/memory/allocation/stream_safe_cuda_allocator.cc @@ -118,6 +118,7 @@ bool StreamSafeCUDAAllocator::IsAllocThreadSafe() const { return true; } Allocation* StreamSafeCUDAAllocator::AllocateImpl(size_t size) { ProcessUnfreedAllocations(); + VLOG(8) << "Try allocate " << size << " bytes"; AllocationPtr underlying_allocation; try { underlying_allocation = underlying_allocator_->Allocate(size); @@ -150,10 +151,12 @@ void StreamSafeCUDAAllocator::FreeImpl(Allocation* allocation) { "StreamSafeCUDAAllocation*", allocation)); VLOG(8) << "Try free allocation " << stream_safe_cuda_allocation->ptr(); + std::lock_guard lock_guard(unfreed_allocation_lock_); if (stream_safe_cuda_allocation->CanBeFreed()) { + VLOG(9) << "Directly delete allocation"; delete stream_safe_cuda_allocation; } else { - std::lock_guard lock_guard(unfreed_allocation_lock_); + VLOG(9) << "Put into unfreed_allocation list"; unfreed_allocations_.emplace_back(stream_safe_cuda_allocation); } } diff --git a/paddle/fluid/memory/malloc.cc b/paddle/fluid/memory/malloc.cc index 6a8bb59260b34f8dd035cb5d5e60dcecf1579352..3e859377e98d801e775461d9cfaaa50fe9c43e8e 100644 --- a/paddle/fluid/memory/malloc.cc +++ b/paddle/fluid/memory/malloc.cc @@ -41,6 +41,12 @@ std::shared_ptr AllocShared(const platform::Place& place, stream); } +bool InSameStream(const std::shared_ptr& allocation, + const platform::Stream& stream) { + return allocation::AllocatorFacade::Instance().InSameStream(allocation, + stream); +} + #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) AllocationPtr Alloc(const platform::CUDAPlace& place, size_t size, const gpuStream_t& stream) { diff --git a/paddle/fluid/memory/malloc.h b/paddle/fluid/memory/malloc.h index b2ad3a98106a36acc5e920d8394f15752d3e839f..7069fb46203d6d0d96be51e556806e467d019ba0 100644 --- a/paddle/fluid/memory/malloc.h +++ b/paddle/fluid/memory/malloc.h @@ -45,6 +45,9 @@ extern std::shared_ptr AllocShared(const platform::Place& place, size_t size, const platform::Stream& stream); +extern bool InSameStream(const std::shared_ptr& allocation, + const platform::Stream& stream); + #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) extern AllocationPtr Alloc(const platform::CUDAPlace& place, size_t size, const gpuStream_t& stream); diff --git a/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt b/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt index 42d109c17baa8d276fc88e758b4558b83d2ff133..c1a2c36d8a344a4fa49156f69c157154369d503f 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt @@ -2,7 +2,8 @@ file(GLOB TEST_INTERP_CASES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_INTERP_CASES "${TEST_INTERP_CASES}") foreach(target ${TEST_INTERP_CASES}) - py_test_modules(${target} MODULES ${target}) + py_test_modules(${target} MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0) + py_test_modules(${target}_non_eager_deletion MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0.000001) + py_test_modules(${target}_fast_gc MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0) + py_test_modules(${target}_fast_gc_non_eager_deletion MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0.000001) endforeach() - -set_tests_properties(test_standalone_executor PROPERTIES ENVIRONMENT FLAGS_use_stream_safe_cuda_allocator=true)