未验证 提交 73e441f9 编写于 作者: R ronnywang 提交者: GitHub

[CustomDevice] add stream safe allocator support (#55393)

上级 95aab366
......@@ -51,7 +51,8 @@ if(UNIX AND NOT APPLE)
endif()
if(WITH_CUSTOM_DEVICE)
list(APPEND ALLOCATOR_SRCS custom_allocator.cc)
list(APPEND ALLOCATOR_SRCS custom_allocator.cc
stream_safe_custom_device_allocator.cc)
endif()
if(WITH_XPU)
......
......@@ -61,9 +61,9 @@
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
#include "paddle/fluid/memory/allocation/custom_allocator.h"
#include "paddle/fluid/platform/device/device_wrapper.h"
#include "paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h"
#endif
#include "paddle/fluid/platform/flags.h"
PADDLE_DEFINE_EXPORTED_int64(
......@@ -174,6 +174,11 @@ class AllocatorFacadePrivate {
std::map<platform::XPUPlace,
std::map<XPUStream, std::shared_ptr<Allocator>>>;
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
using CustomDeviceAllocatorMap =
std::map<platform::CustomPlace,
std::map<phi::stream::stream_t, std::shared_ptr<Allocator>>>;
#endif
explicit AllocatorFacadePrivate(bool allow_free_idle_chunk = true) {
strategy_ = GetAllocatorStrategy();
......@@ -564,6 +569,46 @@ class AllocatorFacadePrivate {
}
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
bool HasCustomDevice(const platform::CustomPlace& place,
phi::stream::stream_t stream) {
auto it = custom_device_allocators_.find(place);
if (it == custom_device_allocators_.end()) {
return false;
}
auto& allocator_map = it->second;
return allocator_map.find(stream) != allocator_map.end();
}
const std::shared_ptr<Allocator>& GetAllocator(
const platform::CustomPlace& place,
phi::stream::stream_t stream,
bool create_if_not_found = false) {
/* shared_lock_guard */ {
std::shared_lock<std::shared_timed_mutex> lock_guard(
custom_device_allocator_mutex_);
if (LIKELY(HasCustomDevice(place, stream))) {
return custom_device_allocators_[place][stream];
} else {
PADDLE_ENFORCE_NE(create_if_not_found,
false,
platform::errors::NotFound(
"No allocator found for stream %s in place %s "
"with create_if_not_found = false",
stream,
place));
}
}
/* unique_lock_guard */ {
std::unique_lock<std::shared_timed_mutex> lock_guard(
custom_device_allocator_mutex_);
InitStreamSafeCustomDeviceAllocator(place, stream);
return custom_device_allocators_[place][stream];
}
}
#endif
private:
class ZeroSizeAllocator : public Allocator {
public:
......@@ -1008,9 +1053,17 @@ class AllocatorFacadePrivate {
allocators_[p] = std::make_shared<NaiveBestFitAllocator>(p);
}
void InitNaiveBestFitCustomDeviceAllocator(platform::CustomPlace p,
phi::stream::stream_t stream) {
custom_device_allocators_[p][stream] =
std::make_shared<NaiveBestFitAllocator>(p);
}
void InitAutoGrowthCustomDeviceAllocator(platform::CustomPlace p,
bool allow_free_idle_chunk) {
auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 20;
VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is "
<< FLAGS_auto_growth_chunk_size_in_mb;
auto custom_allocator =
std::make_shared<paddle::memory::allocation::CustomAllocator>(p);
allocators_[p] = std::make_shared<AutoGrowthBestFitAllocator>(
......@@ -1019,6 +1072,40 @@ class AllocatorFacadePrivate {
/*chunk_size=*/chunk_size,
allow_free_idle_chunk);
}
void InitAutoGrowthCustomDeviceAllocator(platform::CustomPlace p,
phi::stream::stream_t stream) {
auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 20;
VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is "
<< FLAGS_auto_growth_chunk_size_in_mb;
auto custom_allocator =
std::make_shared<paddle::memory::allocation::CustomAllocator>(p);
auto alignment = phi::DeviceManager::GetMinChunkSize(p);
custom_device_allocators_[p][stream] =
std::make_shared<AutoGrowthBestFitAllocator>(
custom_allocator, alignment, chunk_size, allow_free_idle_chunk_);
}
void WrapStreamSafeCustomDeviceAllocator(platform::CustomPlace p,
phi::stream::stream_t stream) {
std::shared_ptr<Allocator>& allocator =
custom_device_allocators_[p][stream];
allocator =
std::make_shared<StreamSafeCustomDeviceAllocator>(allocator, p, stream);
}
void InitStreamSafeCustomDeviceAllocator(platform::CustomPlace p,
phi::stream::stream_t stream) {
VLOG(8) << "Init CustomDevice allocator for stream " << stream
<< " in place " << p;
if (strategy_ == AllocatorStrategy::kAutoGrowth) {
InitAutoGrowthCustomDeviceAllocator(p, stream);
} else {
InitNaiveBestFitCustomDeviceAllocator(p, stream);
}
WrapStreamSafeCustomDeviceAllocator(p, stream);
}
#endif
void InitSystemAllocators() {
......@@ -1161,6 +1248,15 @@ class AllocatorFacadePrivate {
std::shared_timed_mutex xpu_allocator_mutex_;
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
// a standalone custom device allocator to support multi-stream GC in new
// executor
std::map<platform::Place, std::shared_ptr<StreamSafeCustomDeviceAllocator>>
default_stream_safe_custom_device_allocators_;
CustomDeviceAllocatorMap custom_device_allocators_;
std::shared_timed_mutex custom_device_allocator_mutex_;
#endif
AllocatorStrategy strategy_;
AllocatorMap allocators_;
static AllocatorMap zero_size_allocators_;
......@@ -1252,6 +1348,16 @@ std::shared_ptr<phi::Allocation> AllocatorFacade::AllocShared(
AllocationPtr AllocatorFacade::Alloc(const platform::Place& place,
size_t size,
const phi::Stream& stream) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
if (platform::is_custom_place(place)) {
platform::CustomPlace p(place);
phi::stream::stream_t s =
reinterpret_cast<phi::stream::stream_t>(stream.id());
return GetPrivate()
->GetAllocator(p, s, /* create_if_not_found = */ true)
->Allocate(size);
}
#endif
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
AllocatorFacadePrivate* m = GetPrivate();
if (!m->IsStreamSafeCUDAAllocatorUsed()) {
......@@ -1270,8 +1376,8 @@ AllocationPtr AllocatorFacade::Alloc(const platform::Place& place,
#elif defined(PADDLE_WITH_XPU)
return GetAllocator(place)->Allocate(size);
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with GPU or XPU."));
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Not compiled with GPU or XPU or CustomDevice."));
#endif
}
......@@ -1376,6 +1482,19 @@ void AllocatorFacade::RemoveMemoryPoolOfCUDAGraph(int64_t id) {
#endif
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
const std::shared_ptr<Allocator>& AllocatorFacade::GetAllocator(
const platform::Place& place, phi::stream::stream_t stream) {
AllocatorFacadePrivate* m = GetPrivate();
if (!FLAGS_use_stream_safe_cuda_allocator) {
return m->GetAllocator(place,
stream,
/*create_if_not_found=*/true);
}
return m->GetAllocator(place, /* A non-zero num to choose allocator_ */ 1);
}
#endif
UNUSED static std::shared_ptr<NaiveBestFitAllocator> unused_obj =
std::make_shared<NaiveBestFitAllocator>(platform::CPUPlace());
......
......@@ -25,6 +25,11 @@
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/core/stream.h"
#ifdef PADDLE_WITH_CUSTOM_DEVICE
#include "paddle/fluid/memory/allocation/custom_allocator.h"
#include "paddle/phi/backends/device_manager.h"
#endif
namespace paddle {
namespace memory {
namespace allocation {
......@@ -91,6 +96,10 @@ class AllocatorFacade {
void RemoveMemoryPoolOfCUDAGraph(int64_t id);
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
const std::shared_ptr<Allocator>& GetAllocator(const platform::Place& place,
phi::stream::stream_t stream);
#endif
// TODO(yy): Allocate a Copy-On-Write allocation?
private:
AllocatorFacade();
......
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h"
#include <thread>
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace memory {
namespace allocation {
StreamSafeCustomDeviceAllocation::StreamSafeCustomDeviceAllocation(
DecoratedAllocationPtr underlying_allocation,
phi::stream::stream_t owning_stream,
StreamSafeCustomDeviceAllocator* allocator)
: Allocation(underlying_allocation->ptr(),
underlying_allocation->base_ptr(),
underlying_allocation->size(),
underlying_allocation->place()),
underlying_allocation_(std::move(underlying_allocation)),
owning_stream_(std::move(owning_stream)),
allocator_(allocator->shared_from_this()) {}
void StreamSafeCustomDeviceAllocation::RecordStream(
phi::stream::stream_t stream) {
VLOG(8) << "Try record stream " << stream << " for address " << ptr();
if (stream == owning_stream_) {
return;
}
std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); });
std::lock_guard<SpinLock> lock_guard(outstanding_event_map_lock_);
auto it = outstanding_event_map_.find(stream);
if (it == outstanding_event_map_.end()) {
outstanding_event_map_[stream].Init(place());
VLOG(9) << "Create a new event "
<< outstanding_event_map_[stream].raw_event();
auto stream_wrapper = phi::stream::Stream(place(), stream);
VLOG(8) << "Record event " << it->second.raw_event() << " to stream "
<< stream;
outstanding_event_map_[stream].Record(&stream_wrapper);
}
}
void StreamSafeCustomDeviceAllocation::MarkAsWillBeFreed() {
std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); });
std::lock_guard<SpinLock> lock_guard(outstanding_event_map_lock_);
if (!will_be_freed_) {
will_be_freed_ = false;
VLOG(8) << "ptr: " << ptr() << " will be freed";
if (phi::DeviceManager::HasDeviceType(place_.GetDeviceType()) &&
outstanding_event_map_.find(owning_stream_) ==
outstanding_event_map_.end()) {
outstanding_event_map_[owning_stream_].Init(place_);
VLOG(9) << "Create a new event "
<< outstanding_event_map_[owning_stream_].raw_event();
auto stream_wrapper = phi::stream::Stream(place_, owning_stream_);
VLOG(8) << "Record event "
<< outstanding_event_map_[owning_stream_].raw_event()
<< " to stream " << owning_stream_;
outstanding_event_map_[owning_stream_].Record(&stream_wrapper);
}
}
}
bool StreamSafeCustomDeviceAllocation::CanBeFreed() {
std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); });
std::lock_guard<SpinLock> lock_guard(outstanding_event_map_lock_);
if (!phi::DeviceManager::HasDeviceType(place_.GetDeviceType())) {
return true;
}
for (auto it = outstanding_event_map_.begin();
it != outstanding_event_map_.end();
++it) {
auto& event = it->second;
if (!event.Query()) {
VLOG(9) << "Event " << event.raw_event() << " for " << ptr()
<< " is not completed";
return false;
}
VLOG(8) << "Destroy event " << event.raw_event();
outstanding_event_map_.erase(outstanding_event_map_.begin(), it);
event.Destroy();
}
return true;
}
phi::stream::stream_t StreamSafeCustomDeviceAllocation::GetOwningStream()
const {
return owning_stream_;
}
StreamSafeCustomDeviceAllocator::StreamSafeCustomDeviceAllocator(
std::shared_ptr<Allocator> underlying_allocator,
platform::CustomPlace place,
phi::stream::stream_t default_stream)
: underlying_allocator_(std::move(underlying_allocator)),
place_(std::move(place)),
default_stream_(std::move(default_stream)) {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
allocator_map_[place].emplace_back(this);
}
StreamSafeCustomDeviceAllocator::~StreamSafeCustomDeviceAllocator() {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
std::vector<StreamSafeCustomDeviceAllocator*>& allocators =
allocator_map_[place_];
allocators.erase(std::remove(allocators.begin(), allocators.end(), this),
allocators.end());
}
phi::stream::stream_t StreamSafeCustomDeviceAllocator::GetDefaultStream()
const {
return default_stream_;
}
void StreamSafeCustomDeviceAllocator::SetDefaultStream(
phi::stream::stream_t stream) {
default_stream_ = stream;
}
phi::Allocation* StreamSafeCustomDeviceAllocator::AllocateImpl(size_t size) {
platform::RecordEvent record("StreamSafeCustomDeviceAllocator::Allocate",
platform::TracerEventType::UserDefined,
9 /*level*/);
ProcessUnfreedAllocations();
VLOG(8) << "Try allocate " << size << " bytes";
AllocationPtr underlying_allocation;
try {
underlying_allocation = underlying_allocator_->Allocate(size);
} catch (BadAlloc&) {
VLOG(4) << "Allocation failed when allocating " << size << " bytes";
ReleaseImpl(place_);
try {
underlying_allocation = underlying_allocator_->Allocate(size);
} catch (...) {
VLOG(3)
<< "Still allocation failed after release memory from all streams";
throw;
}
} catch (...) {
throw;
}
StreamSafeCustomDeviceAllocation* allocation =
new StreamSafeCustomDeviceAllocation(
static_unique_ptr_cast<Allocation>(std::move(underlying_allocation)),
default_stream_,
this);
VLOG(8) << "Thread " << std::this_thread::get_id() << " Allocate "
<< allocation->size() << " bytes at address " << allocation->ptr()
<< " , stream: " << default_stream_;
return allocation;
}
void StreamSafeCustomDeviceAllocator::FreeImpl(phi::Allocation* allocation) {
platform::RecordEvent record("StreamSafeCustomDeviceAllocator::Free",
platform::TracerEventType::UserDefined,
9 /*level*/);
StreamSafeCustomDeviceAllocation* stream_safe_cuda_allocation =
static_cast<StreamSafeCustomDeviceAllocation*>(allocation);
VLOG(8) << "Try free allocation " << stream_safe_cuda_allocation->ptr();
stream_safe_cuda_allocation->MarkAsWillBeFreed();
if (stream_safe_cuda_allocation->CanBeFreed()) {
VLOG(9) << "Directly delete allocation";
delete stream_safe_cuda_allocation;
} else {
VLOG(9) << "Put into unfreed_allocation list";
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
unfreed_allocations_.emplace_back(stream_safe_cuda_allocation);
}
}
uint64_t StreamSafeCustomDeviceAllocator::ReleaseImpl(
const platform::Place& place) {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
std::vector<StreamSafeCustomDeviceAllocator*>& allocators =
allocator_map_[place];
uint64_t released_size = 0;
for (StreamSafeCustomDeviceAllocator* allocator : allocators) {
released_size += allocator->ProcessUnfreedAllocationsAndRelease();
}
VLOG(8) << "Release " << released_size << " bytes memory from all streams";
return released_size;
}
void StreamSafeCustomDeviceAllocator::ProcessUnfreedAllocations() {
// NOTE(Ruibiao): This condition is to reduce lock competion. It does not need
// to be thread-safe since here occasional misjudgments are permissible.
if (unfreed_allocations_.empty()) {
return;
}
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
for (auto it = unfreed_allocations_.begin();
it != unfreed_allocations_.end();) {
if ((*it)->CanBeFreed()) {
delete *it;
it = unfreed_allocations_.erase(it);
} else {
++it;
}
}
}
uint64_t
StreamSafeCustomDeviceAllocator::ProcessUnfreedAllocationsAndRelease() {
ProcessUnfreedAllocations();
return underlying_allocator_->Release(place_);
}
thread_local std::once_flag StreamSafeCustomDeviceAllocation::once_flag_;
std::map<platform::Place, std::vector<StreamSafeCustomDeviceAllocator*>>
StreamSafeCustomDeviceAllocator::allocator_map_;
SpinLock StreamSafeCustomDeviceAllocator::allocator_map_lock_;
} // namespace allocation
} // namespace memory
} // namespace paddle
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <list>
#include <map>
#include <set>
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/device_manager.h"
namespace paddle {
namespace memory {
namespace allocation {
class StreamSafeCustomDeviceAllocator;
class StreamSafeCustomDeviceAllocation : public Allocation {
public:
StreamSafeCustomDeviceAllocation(DecoratedAllocationPtr underlying_allocation,
phi::stream::stream_t owning_stream,
StreamSafeCustomDeviceAllocator *allocator);
void RecordStream(phi::stream::stream_t stream);
bool CanBeFreed();
void MarkAsWillBeFreed();
phi::stream::stream_t GetOwningStream() const;
private:
thread_local static std::once_flag once_flag_;
DecoratedAllocationPtr underlying_allocation_;
std::map<phi::stream::stream_t, phi::event::Event> outstanding_event_map_;
phi::stream::stream_t owning_stream_;
SpinLock outstanding_event_map_lock_;
std::shared_ptr<Allocator> allocator_;
bool will_be_freed_{false};
};
class StreamSafeCustomDeviceAllocator
: public Allocator,
public std::enable_shared_from_this<StreamSafeCustomDeviceAllocator> {
public:
StreamSafeCustomDeviceAllocator(
std::shared_ptr<Allocator> underlying_allocator,
platform::CustomPlace place,
phi::stream::stream_t default_stream);
~StreamSafeCustomDeviceAllocator();
bool IsAllocThreadSafe() const override { return true; }
phi::stream::stream_t GetDefaultStream() const;
void SetDefaultStream(phi::stream::stream_t stream);
protected:
phi::Allocation *AllocateImpl(size_t size) override;
void FreeImpl(phi::Allocation *allocation) override;
uint64_t ReleaseImpl(const platform::Place &place) override;
private:
void ProcessUnfreedAllocations();
uint64_t ProcessUnfreedAllocationsAndRelease();
static std::map<platform::Place,
std::vector<StreamSafeCustomDeviceAllocator *>>
allocator_map_;
static SpinLock allocator_map_lock_;
std::shared_ptr<Allocator> underlying_allocator_;
platform::CustomPlace place_;
phi::stream::stream_t default_stream_;
std::list<StreamSafeCustomDeviceAllocation *> unfreed_allocations_;
SpinLock unfreed_allocation_lock_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
......@@ -108,9 +108,12 @@ inline std::unique_ptr<DeviceContext> CreateDeviceContext(
dev_ctx->SetAllocator(instance.GetAllocator(p).get());
dev_ctx->SetGenerator(phi::DefaultXPUGenerator(p.GetDeviceId()).get());
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE
} else if (p.GetType() == phi::AllocationType::CUSTOM) {
dev_ctx->SetAllocator(instance.GetAllocator(p).get());
auto* custom_ctx = dynamic_cast<phi::CustomContext*>(dev_ctx);
dev_ctx->SetAllocator(instance.GetAllocator(p, custom_ctx->stream()).get());
dev_ctx->SetGenerator(phi::DefaultCustomDeviceGenerator(p).get());
#endif
} else {
dev_ctx->SetAllocator(instance.GetAllocator(p).get());
dev_ctx->SetGenerator(phi::DefaultCPUGenerator().get());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册