未验证 提交 0c7153a4 编写于 作者: F From00 提交者: GitHub

Utilize StreamSafeCUDAAllocator to support fast GC in new executor (#37642)

* fix reshape move storage error

* remove needless set type

* alloc tensor by shared storage

* Utilize StreamSafeCUDAAllocator to support fast GC in new executor

* Fix compile error for Windows and ROCm

* Fix compile error for Windows

* Modify UT stream_safe_cuda_alloc_test

* Modify UT stream_safe_cuda_alloc_test

* Rewrite fast GC

* Rewrite fast GC

* Fix compile error for BOOST_GET_CONST

* Fix compile error for BOOST_GET_CONST

* Changes default stream for StreamSafeCUDAAllocator

* Fix a small CI error

* Remove some redundant code

* Fix conflict

* Fix compile error for ROCm

* Fix Windoes CI error

* Fix CI error

* Remove some unnecessary code

* Fix CI error

* Add UT for fast GC

* Fix CI error

* add device-agnostic stream class

* add stream.h

* fix ut

* fix cpu compile

* Use RWLock in GetAllocator

* Fix CI error
Co-authored-by: NChen Weihang <chenweihang@baidu.com>
Co-authored-by: Nzhiqiu <chenqiuliang@baidu.com>
上级 bed71992
set(INTERPRETERCORE_DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor nan_inf_utils)
graph_to_program_pass variable_helper timer monitor nan_inf_utils interpretercore_event_garbage_collector)
if(WITH_GPU)
list(APPEND INTERPRETERCORE_DEPS interpretercore_fast_garbage_collector)
endif()
add_subdirectory(workqueue)
cc_library(data_transfer SRCS data_transfer.cc DEPS enforce scope glog)
cc_library(new_executor_defs SRCS new_executor_defs.cc DEPS enforce glog scope)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS} executor_gc_helper)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS garbage_collector)
cc_library(interpretercore_event_garbage_collector SRCS interpretercore_event_garbage_collector.cc DEPS interpretercore_garbage_collector)
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue new_executor_defs data_transfer)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog new_executor_defs)
cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context new_executor_defs)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_event_garbage_collector stream_analyzer event_manager)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
if(WITH_GPU OR WITH_ROCM)
if(WITH_GPU)
nv_library(interpretercore_fast_garbage_collector SRCS interpretercore_fast_garbage_collector.cc DEPS interpretercore_garbage_collector)
elseif(WITH_ROCM)
hip_library(interpretercore_fast_garbage_collector SRCS interpretercore_fast_garbage_collector.cc DEPS interpretercore_garbage_collector)
endif()
target_link_libraries(interpretercore interpretercore_fast_garbage_collector)
endif()
# cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler)
# skip win32 since wget is not installed by default on windows machine.
# skip COVERAGE_CI since the test runs slowly because of instrumentation.
if (WITH_TESTING AND NOT WIN32 AND NOT WITH_COVERAGE AND NOT "$ENV{CI_SKIP_CPP_TEST}" STREQUAL "ON")
......
......@@ -16,10 +16,15 @@
#include <unordered_set>
#include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
#include "paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/profiler.h"
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h"
#endif
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, true,
"Use inplace in new executor");
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
......@@ -28,6 +33,8 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true,
DECLARE_bool(check_nan_inf);
DECLARE_bool(benchmark);
DECLARE_bool(fast_eager_deletion_mode);
DECLARE_bool(use_stream_safe_cuda_allocator);
constexpr const char* kExceptionCaught = "ExceptionCaught";
constexpr const char* kTaskCompletion = "TaskCompletion";
......@@ -37,6 +44,10 @@ namespace framework {
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
bool IsInterpretercoreFastGCEnabled() {
return FLAGS_fast_eager_deletion_mode && FLAGS_use_stream_safe_cuda_allocator;
}
InterpreterCore::InterpreterCore(const platform::Place& place,
const BlockDesc& block,
VariableScope* global_scope)
......@@ -47,7 +58,16 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
is_build_ = false;
async_work_queue_.reset(
new interpreter::AsyncWorkQueue(kHostNumThreads, &main_thread_blocker_));
gc_.reset(new InterpreterCoreGarbageCollector());
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (IsInterpretercoreFastGCEnabled()) {
gc_ = std::make_unique<InterpreterCoreFastGarbageCollector>();
} else {
gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
}
#else
gc_ = std::make_unique<InterpreterCoreEventGarbageCollector>();
#endif
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
......@@ -513,7 +533,10 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
try {
RunInstruction(instr_node);
// GC infomation
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
RecordStreamForGC(instr_node);
#endif
CheckGC(instr_node);
} catch (platform::EnforceNotMet& ex) {
framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
......@@ -552,6 +575,99 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
}
}
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
if (!IsInterpretercoreFastGCEnabled() ||
instr.KernelType() != OpFuncType::kQueueAsync) {
return;
}
gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
instr.DeviceContext())
.stream();
auto TensorRecordStream = [&stream](Tensor& tensor) {
auto allocation = tensor.Holder();
if (allocation == nullptr) {
return;
}
const platform::Place& place = allocation->place();
if (platform::is_gpu_place(place)) {
memory::RecordStream(allocation, stream);
} else if (platform::is_cuda_pinned_place(place)) {
// TODO(Ruibiao): Here should do something to make sure that the tensor is
// not freed until the H2D copies done. However, simplely launch a CUDA
// runtime callback to the H2D stream may lead a high performance
// overhead. As all the cases we meet in H2D are copies from CPUPlace at
// present, we just log a WARNING here. A better design is required.
LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
"manner may lead a data inconsistent";
} else {
// memory copies involve CPUPlace are always synchronous, so just do
// nothing here
}
};
/* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
* all the following conditions are satisfied:
* 1. The tensor will be GC after running the instruction, i.e., in
* instr.GCCheckVars.
* 2. The stream which initializes this tensor is different from the stream
* which the instruction run in.
* 3. The tensor is the instruction's input, cause we assume that instruction
* will initialize all output tensors with its running stream.
* 4. In the OP function of this instruction, the tensor is an input of a
* async CUDA kernel.
*
* Here we only process the first condition, because:
* 1. Since the RecordStream function will directly return when the recored
* stream is equal to the owning stream, recording a stream same as which
* initialized this tensor has less time overhead. Conversely, it may take
* more time if we try to extract those cross-stream input vars from
* instr.GCCheckVars.
* 2. Now the instruction has no idea of which vars involving async running in
* OP function, and thus we can not recognize condition 4. It should be
* supported later.
*/
for (int var_id : instr.GCCheckVars()) {
VLOG(4) << "GC sync " << global_scope_->GetNameById(var_id) << " "
<< global_scope_->VarDesc(var_id);
// persistable var will be ignore while GC
if (global_scope_->VarDesc(var_id) &&
global_scope_->VarDesc(var_id)->Persistable()) {
continue;
}
paddle::framework::Variable* var = global_scope_->Var(var_id);
if (var == nullptr) {
continue;
}
if (var->IsType<LoDTensor>()) {
TensorRecordStream(*(var->GetMutable<LoDTensor>()));
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// do nothing
} else if (var->IsType<SelectedRows>()) {
TensorRecordStream(*(var->GetMutable<SelectedRows>()->mutable_value()));
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& tensor : *tensor_arr) {
TensorRecordStream(tensor);
}
} else if (var->IsType<std::vector<Scope*>>()) {
// do nothing
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
}
#endif
void InterpreterCore::CheckGC(const Instruction& instr) {
size_t instr_id = instr.Id();
auto& var_scope = *global_scope_;
......@@ -570,9 +686,22 @@ void InterpreterCore::CheckGC(const Instruction& instr) {
if (is_ready) {
VLOG(6) << "Async delete variable with name : "
<< var_scope.GetNameById(var_id);
gc_->Add(var_scope.Var(var_id), gc_event_.at(instr_id),
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (IsInterpretercoreFastGCEnabled()) {
static_cast<InterpreterCoreFastGarbageCollector*>(gc_.get())->Add(
var_scope.Var(var_id));
} else {
static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
var_scope.Var(var_id), gc_event_.at(instr_id),
&instr.DeviceContext());
}
#else
static_cast<InterpreterCoreEventGarbageCollector*>(gc_.get())->Add(
var_scope.Var(var_id), gc_event_.at(instr_id),
&instr.DeviceContext());
#endif
}
}
}
......
......@@ -72,6 +72,10 @@ class InterpreterCore {
const std::vector<framework::LoDTensor>& feed_tensors,
bool prepare_feed);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void RecordStreamForGC(const Instruction& instr);
#endif
void CheckGC(const Instruction& instr);
void RunInstructionAsync(size_t instr_id);
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.h"
#if !defined(_WIN32)
#include <sched.h>
#else
#define NOMINMAX
#include <windows.h>
#endif // !_WIN32
namespace paddle {
namespace framework {
InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector() {
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ false);
queue_ = CreateSingleThreadedWorkQueue(options);
}
InterpreterCoreEventGarbageCollector::~InterpreterCoreEventGarbageCollector() {
queue_.reset(nullptr);
}
void InterpreterCoreEventGarbageCollector::Add(
Garbage garbage, platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
if (!garbage) {
return;
}
if (max_memory_size_ <= 1) {
Free(garbage, event, ctx);
} else {
std::unique_ptr<GarbageQueue> pending_delete_garbages;
{ // lock guard
std::lock_guard<memory::SpinLock> guard(spinlock_);
cur_memory_size_ += garbage->size();
garbages_->push_back(std::move(garbage));
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
pending_delete_garbages = std::move(garbages_);
garbages_ = std::make_unique<GarbageQueue>();
}
}
}
}
void InterpreterCoreEventGarbageCollector::Add(
Variable* var, platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) {
return;
}
if (var->IsType<LoDTensor>()) {
Add(var->GetMutable<LoDTensor>()->MoveMemoryHolder(), event, ctx);
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<LoDRankTable>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<SelectedRows>()) {
Add(var->GetMutable<SelectedRows>()->mutable_value()->MoveMemoryHolder(),
event, ctx);
var->GetMutable<SelectedRows>()->mutable_rows()->clear();
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
Add(t.MoveMemoryHolder(), event, ctx);
}
} else if (var->IsType<std::vector<Scope*>>()) {
// NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE
// refer to executor.cc to see what old garbage collector does.
// do nothing, because the sub scope will be deleted by sub-executor.
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
void InterpreterCoreEventGarbageCollector::Free(
GarbageQueue* garbages, platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbages, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
delete container;
});
}
void InterpreterCoreEventGarbageCollector::Free(
Garbage& garbage, platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbage, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
});
}
} // namespace framework
} // namespace paddle
\ No newline at end of file
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <queue>
#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
namespace paddle {
namespace framework {
class InterpreterCoreEventGarbageCollector
: public InterpreterCoreGarbageCollector {
public:
InterpreterCoreEventGarbageCollector();
~InterpreterCoreEventGarbageCollector();
virtual void Add(Variable* var, platform::DeviceEvent& event,
const platform::DeviceContext* ctx) override;
private:
void Add(Garbage garbage, platform::DeviceEvent& event,
const platform::DeviceContext* ctx);
void Free(GarbageQueue* garbages, platform::DeviceEvent& event,
const platform::DeviceContext* ctx);
void Free(Garbage& garbage, platform::DeviceEvent& event,
const platform::DeviceContext* ctx);
std::unique_ptr<WorkQueue> queue_;
paddle::memory::SpinLock spinlock_;
};
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore_fast_garbage_collector.h"
namespace paddle {
namespace framework {
void InterpreterCoreFastGarbageCollector::Add(Variable* var) {
if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) {
return;
}
if (var->IsType<LoDTensor>()) {
Add(var->GetMutable<LoDTensor>()->MoveMemoryHolder());
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<LoDRankTable>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<SelectedRows>()) {
Add(var->GetMutable<SelectedRows>()->mutable_value()->MoveMemoryHolder());
var->GetMutable<SelectedRows>()->mutable_rows()->clear();
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
Add(t.MoveMemoryHolder());
}
} else if (var->IsType<std::vector<Scope*>>()) {
// NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE
// refer to executor.cc to see what old garbage collector does.
// do nothing, because the sub scope will be deleted by sub-executor.
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
void InterpreterCoreFastGarbageCollector::Add(Garbage garbage) {
if (!garbage) {
return;
}
if (max_memory_size_ > 1) {
std::unique_ptr<GarbageQueue> pending_delete_garbages;
{ // lock guard
std::lock_guard<memory::SpinLock> guard(spinlock_);
cur_memory_size_ += garbage->size();
garbages_->push_back(std::move(garbage));
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
pending_delete_garbages = std::move(garbages_);
garbages_ = std::make_unique<GarbageQueue>();
}
}
}
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h"
namespace paddle {
namespace framework {
class InterpreterCoreFastGarbageCollector
: public InterpreterCoreGarbageCollector {
public:
virtual void Add(Variable* var) override;
private:
void Add(Garbage garbage);
};
} // namespace framework
} // namespace paddle
......@@ -19,114 +19,23 @@ namespace paddle {
namespace framework {
InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() {
garbages_.reset(new GarbageQueue());
max_memory_size_ = static_cast<size_t>(GetEagerDeletionThreshold());
garbages_ = std::make_unique<GarbageQueue>();
max_memory_size_ = static_cast<int64_t>(GetEagerDeletionThreshold());
cur_memory_size_ = 0;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ false);
queue_ = CreateSingleThreadedWorkQueue(options);
}
InterpreterCoreGarbageCollector::~InterpreterCoreGarbageCollector() {
queue_.reset(nullptr);
}
void InterpreterCoreGarbageCollector::Add(
std::shared_ptr<memory::Allocation> garbage,
paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) {
if (max_memory_size_ <= 1) {
Free(garbage, event, ctx);
} else {
if (!garbage) return;
GarbageQueue* garbage_ptr = nullptr;
{
std::lock_guard<paddle::memory::SpinLock> guard(spinlock_);
cur_memory_size_ += garbage->size();
garbages_->push_back(std::move(garbage));
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
garbage_ptr = garbages_.release();
garbages_.reset(new GarbageQueue());
}
}
if (garbage_ptr) {
Free(garbage_ptr, event, ctx);
}
}
void InterpreterCoreGarbageCollector::Add(Variable* var) {
PADDLE_THROW(
platform::errors::Unimplemented("Not allowed to call the member function "
"of InterpreterCoreGarbageCollector"));
}
void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var,
paddle::platform::DeviceEvent& event,
void InterpreterCoreGarbageCollector::Add(Variable* var,
platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
if (!var) {
return;
}
if (var->IsType<LoDTensor>()) {
Add(var->GetMutable<LoDTensor>()->MoveMemoryHolder(), event, ctx);
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<LoDRankTable>()) {
// TODO(xiongkun03) in old executor, this type of variable is not support
// eager deletion. so we just leave it here ?
} else if (var->IsType<SelectedRows>()) {
Add(var->GetMutable<SelectedRows>()->mutable_value()->MoveMemoryHolder(),
event, ctx);
var->GetMutable<SelectedRows>()->mutable_rows()->clear();
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
Add(t.MoveMemoryHolder(), event, ctx);
}
} else if (var->IsType<std::vector<Scope*>>()) {
// NOTE(@xiongkun03) conditional_op / while_op will create a STEP_SCOPE
// refer to executor.cc to see what old garbage collector does.
// do nothing, because the sub scope will be deleted by sub-executor.
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
void InterpreterCoreGarbageCollector::Free(GarbageQueue* garbages,
paddle::platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbages, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
delete container;
});
}
void InterpreterCoreGarbageCollector::Free(
std::shared_ptr<memory::Allocation>& garbage,
paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbage, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
});
PADDLE_THROW(
platform::errors::Unimplemented("Not allowed to call the member function "
"of InterpreterCoreGarbageCollector"));
}
} // namespace framework
......
......@@ -13,54 +13,31 @@
// limitations under the License.
#pragma once
#if !defined(_WIN32)
#include <sched.h>
#else
#define NOMINMAX
#include <windows.h>
#endif // !_WIN32
#include <queue>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue/workqueue.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"
namespace paddle {
namespace framework {
using GarbageQueue = std::deque<std::shared_ptr<memory::Allocation>>;
using Garbage = std::shared_ptr<memory::Allocation>;
using GarbageQueue = std::deque<Garbage>;
class InterpreterCoreGarbageCollector {
public:
InterpreterCoreGarbageCollector();
~InterpreterCoreGarbageCollector();
void Add(std::shared_ptr<memory::Allocation> garbage, // NOLINT
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
void Add(paddle::framework::Variable* var,
paddle::platform::DeviceEvent& event, // NOLINT
virtual ~InterpreterCoreGarbageCollector(){};
virtual void Add(Variable* var);
virtual void Add(Variable* var, platform::DeviceEvent& event,
const platform::DeviceContext* ctx);
DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector);
private:
void Free(GarbageQueue* garbages,
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
void Free(std::shared_ptr<memory::Allocation>& garbage, // NOLINT
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
protected:
std::unique_ptr<GarbageQueue> garbages_;
size_t max_memory_size_;
size_t cur_memory_size_;
std::unique_ptr<WorkQueue> queue_;
paddle::memory::SpinLock spinlock_;
int64_t max_memory_size_;
int64_t cur_memory_size_;
memory::SpinLock spinlock_;
};
} // namespace framework
} // namespace paddle
......@@ -98,7 +98,9 @@ void* Tensor::mutable_data(const platform::Place& place,
/* some versions of boost::variant don't have operator!= */
if (holder_ == nullptr || !(holder_->place() == place) ||
holder_->size() < size + offset_) {
holder_->size() < size + offset_ ||
!(platform::is_gpu_place(place) &&
memory::InSameStream(holder_, stream))) {
holder_.reset();
holder_ = memory::AllocShared(place, size, stream);
offset_ = 0;
......
......@@ -26,6 +26,7 @@
#include "paddle/fluid/platform/place.h"
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include <shared_mutex>
#include "paddle/fluid/memory/allocation/cuda_allocator.h"
#include "paddle/fluid/memory/allocation/pinned_allocator.h"
#include "paddle/fluid/memory/allocation/stream_safe_cuda_allocator.h"
......@@ -151,11 +152,12 @@ class AllocatorFacadePrivate {
}
#endif
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (FLAGS_use_stream_safe_cuda_allocator) {
LOG(WARNING) << "FLAGS_use_stream_safe_cuda_allocator is invalid for "
"naive_best_fit strategy";
FLAGS_use_stream_safe_cuda_allocator = false;
}
PADDLE_ENFORCE_EQ(
FLAGS_use_stream_safe_cuda_allocator, false,
paddle::platform::errors::Unimplemented(
"StreamSafeCUDAAllocator is only implemented for auto_growth "
"strategy, not support naive_best_fit strategy"));
for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount(); ++dev_id) {
InitNaiveBestFitCUDAAllocator(platform::CUDAPlace(dev_id));
}
......@@ -185,9 +187,6 @@ class AllocatorFacadePrivate {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
allow_free_idle_chunk_ = allow_free_idle_chunk;
if (FLAGS_use_stream_safe_cuda_allocator) {
default_streams_ =
std::vector<gpuStream_t>(platform::GetGPUDeviceCount(), nullptr);
// TODO(Ruibiao): Support multi-stream allocator for other strategies
for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount();
++dev_id) {
InitStreamSafeCUDAAllocator(platform::CUDAPlace(dev_id), nullptr);
......@@ -232,11 +231,11 @@ class AllocatorFacadePrivate {
}
#endif
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
if (FLAGS_use_stream_safe_cuda_allocator) {
LOG(WARNING) << "FLAGS_use_stream_safe_cuda_allocator is invalid for "
"thread_local strategy";
FLAGS_use_stream_safe_cuda_allocator = false;
}
PADDLE_ENFORCE_EQ(
FLAGS_use_stream_safe_cuda_allocator, false,
paddle::platform::errors::Unimplemented(
"StreamSafeCUDAAllocator is only implemented for auto_growth "
"strategy, not support thread_local strategy"));
for (int dev_id = 0; dev_id < platform::GetGPUDeviceCount(); ++dev_id) {
InitThreadLocalCUDAAllocator(platform::CUDAPlace(dev_id));
......@@ -282,50 +281,45 @@ class AllocatorFacadePrivate {
}
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
bool HasCUDAAllocator(const platform::CUDAPlace& place,
const gpuStream_t& stream) {
auto it = cuda_allocators_.find(place);
if (it == cuda_allocators_.end()) {
return false;
}
const std::map<gpuStream_t, std::shared_ptr<Allocator>>& allocator_map =
it->second;
return allocator_map.find(stream) != allocator_map.end();
}
const std::shared_ptr<Allocator>& GetAllocator(
const platform::CUDAPlace& place, const gpuStream_t& stream,
bool create_if_not_found = false) {
auto place_it = cuda_allocators_.find(place);
PADDLE_ENFORCE_NE(place_it, cuda_allocators_.end(),
{ // shared_lock_guard
std::shared_lock<std::shared_timed_mutex> lock_guard(
cuda_allocator_mutex_);
if (LIKELY(HasCUDAAllocator(place, stream))) {
return cuda_allocators_[place][stream];
} else {
PADDLE_ENFORCE_NE(create_if_not_found, false,
platform::errors::NotFound(
"No allocator found for the place %s", place));
"No allocator found for stream %s in place %s "
"with create_if_not_found = false",
stream, place));
}
}
const std::map<gpuStream_t, std::shared_ptr<Allocator>>& allocator_map =
place_it->second;
auto stream_it = allocator_map.find(stream);
if (stream_it == allocator_map.end()) {
if (create_if_not_found) {
{ // unique_lock_guard
std::unique_lock<std::shared_timed_mutex> lock_guard(
cuda_allocator_mutex_);
InitStreamSafeCUDAAllocator(place, stream);
return cuda_allocators_[place][stream];
} else {
PADDLE_THROW(platform::errors::NotFound(
"No allocator found for stream %s in place %s", stream, place));
}
}
return stream_it->second;
}
const gpuStream_t& GetDefaultStream(const platform::CUDAPlace& place) {
int dev_id = place.GetDeviceId();
gpuStream_t& default_stream = default_streams_[dev_id];
if (UNLIKELY(default_stream == nullptr)) {
/* NOTE(Ruibiao): Here if we set default_stream by code " default_stream =
* platform::stream::get_current_stream(place.GetDeviceId())->raw_stream()
* ", then it will be fail to make target 'jit_kernel_benchmark', says a
* undefined reference to `paddle::platform::DeviceContextPool::Get(
* paddle::platform::Place const&)' in function
* `paddle::platform::stream::get_current_stream(int)'. However, target
* allocator_facade will not be affected. It seems a circular dependency
* problem between 'cuda_stream' and 'device_context' that causes this
* strange bug.
*/
platform::DeviceContextPool& pool =
platform::DeviceContextPool::Instance();
default_stream =
static_cast<platform::CUDADeviceContext*>(pool.Get(place))->stream();
InitStreamSafeCUDAAllocator(place, default_stream);
}
return default_stream;
gpuStream_t GetDefaultStream(const platform::CUDAPlace& place) {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
return static_cast<platform::CUDADeviceContext*>(pool.Get(place))->stream();
}
void RecordStream(std::shared_ptr<Allocation> allocation,
......@@ -443,18 +437,12 @@ class AllocatorFacadePrivate {
"Only support auto-growth strategey for StreamSafeCUDAAllocator, "
"the allocator strategy %d is unsupported for multi-stream",
static_cast<int>(strategy_)));
VLOG(9) << "Init CUDA allocator for stream " << stream << " in place " << p;
std::lock_guard<SpinLock> lock_guard(cuda_allocators_lock_);
try {
GetAllocator(p, stream);
VLOG(9) << "Other thread had build a allocator for stream " << stream
<< " in place " << p;
} catch (platform::EnforceNotMet&) {
if (LIKELY(!HasCUDAAllocator(p, stream))) {
VLOG(8) << "Init CUDA allocator for stream " << stream << " in place "
<< p;
InitAutoGrowthCUDAAllocator(p, stream);
WrapStreamSafeCUDAAllocator(p, stream);
WrapCUDARetryAllocator(p, stream, FLAGS_gpu_allocator_retry_time);
} catch (...) {
throw;
}
}
......@@ -618,7 +606,7 @@ class AllocatorFacadePrivate {
void WrapStreamSafeCUDAAllocator(platform::CUDAPlace p, gpuStream_t stream) {
const std::shared_ptr<Allocator>& underlying_allocator =
GetAllocator(p, stream);
cuda_allocators_[p][stream];
cuda_allocators_[p][stream] = std::make_shared<StreamSafeCUDAAllocator>(
underlying_allocator, p, stream);
}
......@@ -629,7 +617,7 @@ class AllocatorFacadePrivate {
retry_time, 0,
platform::errors::InvalidArgument(
"Retry time should be larger than 0, but got %d", retry_time));
std::shared_ptr<Allocator> allocator = GetAllocator(p, stream);
std::shared_ptr<Allocator> allocator = cuda_allocators_[p][stream];
allocator = std::make_shared<RetryAllocator>(allocator, retry_time);
}
......@@ -784,8 +772,7 @@ class AllocatorFacadePrivate {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
// a standalone CUDA allocator to support multi-stream GC in new executor
CUDAAllocatorMap cuda_allocators_;
std::vector<gpuStream_t> default_streams_;
SpinLock cuda_allocators_lock_;
std::shared_timed_mutex cuda_allocator_mutex_;
#ifdef PADDLE_WITH_CUDA
std::unordered_map<CUDAGraphID, std::unique_ptr<AllocatorFacadePrivate>>
cuda_graph_allocator_map_;
......@@ -903,6 +890,31 @@ std::shared_ptr<Allocation> AllocatorFacade::AllocShared(
#endif
}
bool AllocatorFacade::InSameStream(
const std::shared_ptr<Allocation>& allocation,
const platform::Stream& stream) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
PADDLE_ENFORCE_EQ(
FLAGS_use_stream_safe_cuda_allocator, true,
platform::errors::Unimplemented(
"StreamSafeCUDAAllocator is disabled, you should not call this "
"multi-stream 'InSameStream' function. To enable it, you can enter"
"'export FLAGS_use_stream_safe_cuda_allocator=true' in the "
"terminal."));
#ifdef PADDLE_WITH_CUDA
if (UNLIKELY(platform::CUDAGraph::IsCapturing())) {
PADDLE_THROW(platform::errors::Unavailable(
"Not allow to use StreamSafeCUDAAllocator with CUDAGraphAllocator"));
}
#endif
gpuStream_t s = reinterpret_cast<gpuStream_t>(stream.id());
return s == GetStream(allocation);
#else
PADDLE_THROW(platform::errors::PreconditionNotMet("Not compiled with GPU."));
#endif
}
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
AllocationPtr AllocatorFacade::Alloc(const platform::Place& place, size_t size,
const gpuStream_t& stream) {
......
......@@ -61,6 +61,10 @@ class AllocatorFacade {
std::shared_ptr<Allocation> AllocShared(const platform::Place& place,
size_t size,
const platform::Stream& stream);
bool InSameStream(const std::shared_ptr<Allocation>& allocation,
const platform::Stream& stream);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
// TODO(zhiqiu): change gpuStream_t to platform::Stream if needed.
AllocationPtr Alloc(const platform::Place& place, size_t size,
......
......@@ -118,6 +118,7 @@ bool StreamSafeCUDAAllocator::IsAllocThreadSafe() const { return true; }
Allocation* StreamSafeCUDAAllocator::AllocateImpl(size_t size) {
ProcessUnfreedAllocations();
VLOG(8) << "Try allocate " << size << " bytes";
AllocationPtr underlying_allocation;
try {
underlying_allocation = underlying_allocator_->Allocate(size);
......@@ -150,10 +151,12 @@ void StreamSafeCUDAAllocator::FreeImpl(Allocation* allocation) {
"StreamSafeCUDAAllocation*",
allocation));
VLOG(8) << "Try free allocation " << stream_safe_cuda_allocation->ptr();
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
if (stream_safe_cuda_allocation->CanBeFreed()) {
VLOG(9) << "Directly delete allocation";
delete stream_safe_cuda_allocation;
} else {
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
VLOG(9) << "Put into unfreed_allocation list";
unfreed_allocations_.emplace_back(stream_safe_cuda_allocation);
}
}
......
......@@ -41,6 +41,12 @@ std::shared_ptr<Allocation> AllocShared(const platform::Place& place,
stream);
}
bool InSameStream(const std::shared_ptr<Allocation>& allocation,
const platform::Stream& stream) {
return allocation::AllocatorFacade::Instance().InSameStream(allocation,
stream);
}
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
AllocationPtr Alloc(const platform::CUDAPlace& place, size_t size,
const gpuStream_t& stream) {
......
......@@ -45,6 +45,9 @@ extern std::shared_ptr<Allocation> AllocShared(const platform::Place& place,
size_t size,
const platform::Stream& stream);
extern bool InSameStream(const std::shared_ptr<Allocation>& allocation,
const platform::Stream& stream);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
extern AllocationPtr Alloc(const platform::CUDAPlace& place, size_t size,
const gpuStream_t& stream);
......
......@@ -2,7 +2,8 @@ file(GLOB TEST_INTERP_CASES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_INTERP_CASES "${TEST_INTERP_CASES}")
foreach(target ${TEST_INTERP_CASES})
py_test_modules(${target} MODULES ${target})
py_test_modules(${target} MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0)
py_test_modules(${target}_non_eager_deletion MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0.000001)
py_test_modules(${target}_fast_gc MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0)
py_test_modules(${target}_fast_gc_non_eager_deletion MODULES ${target} ENVS FLAGS_allocator_strategy=auto_growth FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0.000001)
endforeach()
set_tests_properties(test_standalone_executor PROPERTIES ENVIRONMENT FLAGS_use_stream_safe_cuda_allocator=true)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册