未验证 提交 168fac13 编写于 作者: Y ykkk2333 提交者: GitHub

xpu support auto growth allocator (#54121)

上级 78fc636e
......@@ -56,6 +56,7 @@ endif()
if(WITH_XPU)
list(APPEND ALLOCATOR_DEPS xpu_info)
list(APPEND ALLOCATOR_SRCS xpu_allocator.cc stream_safe_xpu_allocator.cc)
endif()
if(WITH_IPU)
......
......@@ -22,6 +22,7 @@
#include "paddle/fluid/memory/allocation/naive_best_fit_allocator.h"
#include "paddle/fluid/memory/allocation/retry_allocator.h"
#include "paddle/fluid/memory/allocation/stat_allocator.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/core/macros.h"
......@@ -35,7 +36,6 @@
#include "paddle/fluid/memory/allocation/stream_safe_cuda_allocator.h"
#include "paddle/fluid/memory/allocation/thread_local_allocator.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/phi/backends/gpu/gpu_context.h"
#ifdef PADDLE_WITH_CUDA
......@@ -50,7 +50,10 @@
#endif
#ifdef PADDLE_WITH_XPU
#include "paddle/fluid/memory/allocation/stream_safe_xpu_allocator.h"
#include "paddle/fluid/memory/allocation/xpu_allocator.h"
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/phi/backends/xpu/xpu_context.h"
#endif
#ifdef PADDLE_WITH_IPU
......@@ -166,6 +169,11 @@ class AllocatorFacadePrivate {
std::map<platform::CUDAPlace,
std::map<gpuStream_t, std::shared_ptr<Allocator>>>;
#endif
#ifdef PADDLE_WITH_XPU
using XPUAllocatorMap =
std::map<platform::XPUPlace,
std::map<XPUStream, std::shared_ptr<Allocator>>>;
#endif
explicit AllocatorFacadePrivate(bool allow_free_idle_chunk = true) {
strategy_ = GetAllocatorStrategy();
......@@ -236,9 +244,16 @@ class AllocatorFacadePrivate {
InitNaiveBestFitCUDAPinnedAllocator();
#endif
#ifdef PADDLE_WITH_XPU
allow_free_idle_chunk_ = allow_free_idle_chunk;
for (int dev_id = 0; dev_id < platform::GetXPUDeviceCount(); ++dev_id) {
InitNaiveBestFitXPUAllocator(platform::XPUPlace(dev_id));
InitAutoGrowthXPUAllocator(platform::XPUPlace(dev_id),
allow_free_idle_chunk_);
}
if (FLAGS_use_stream_safe_cuda_allocator) {
WrapStreamSafeXPUAllocatorForDefault();
is_stream_safe_cuda_allocator_used_ = true;
}
#endif
#ifdef PADDLE_WITH_IPU
for (int dev_id = 0; dev_id < platform::GetIPUDeviceCount(); ++dev_id) {
......@@ -441,6 +456,114 @@ class AllocatorFacadePrivate {
}
#endif
#ifdef PADDLE_WITH_XPU
bool HasXPUAllocator(const platform::XPUPlace& place, XPUStream stream) {
auto it = xpu_allocators_.find(place);
if (it == xpu_allocators_.end()) {
return false;
}
const std::map<XPUStream, std::shared_ptr<Allocator>>& allocator_map =
it->second;
return allocator_map.find(stream) != allocator_map.end();
}
const std::shared_ptr<Allocator>& GetAllocator(
const platform::XPUPlace& place,
XPUStream stream,
bool create_if_not_found = false) {
if (stream == GetDefaultStream(place)) {
VLOG(7) << "Get Allocator by passing in a default stream";
return GetAllocator(place, /* A non-zero num to choose allocator_ */ 1);
}
/* shared_lock_guard */ {
std::shared_lock<std::shared_timed_mutex> lock_guard(
xpu_allocator_mutex_);
if (LIKELY(HasXPUAllocator(place, stream))) {
return xpu_allocators_[place][stream];
} else {
PADDLE_ENFORCE_NE(create_if_not_found,
false,
platform::errors::NotFound(
"No allocator found for stream %s in place %s "
"with create_if_not_found = false",
stream,
place));
}
}
/* unique_lock_guard */ {
std::unique_lock<std::shared_timed_mutex> lock_guard(
xpu_allocator_mutex_);
InitStreamSafeXPUAllocator(place, stream);
return xpu_allocators_[place][stream];
}
}
const std::shared_ptr<StreamSafeXPUAllocator>
GetDefaultStreamSafeXPUAllocator(const platform::XPUPlace& place) const {
const auto iter = default_stream_safe_xpu_allocators_.find(place);
PADDLE_ENFORCE_NE(
iter,
default_stream_safe_xpu_allocators_.end(),
platform::errors::NotFound(
"No StreamSafeXPUAllocator found for the place, %s", place));
return iter->second;
}
XPUStream GetDefaultStream(const platform::XPUPlace& place) const {
const std::shared_ptr<StreamSafeXPUAllocator>& allocator =
GetDefaultStreamSafeXPUAllocator(place);
return allocator->GetDefaultStream();
}
void SetDefaultStream(const platform::XPUPlace& place, XPUStream stream) {
const std::shared_ptr<StreamSafeXPUAllocator>& allocator =
GetDefaultStreamSafeXPUAllocator(place);
PADDLE_ENFORCE_EQ(
allocator->GetDefaultStream(),
nullptr,
platform::errors::Unavailable(
"The default stream for StreamSafeXPUAllocator(%p) in %s has been "
"set to %p, not allow to change it to %p.",
allocator.get(),
place,
allocator->GetDefaultStream(),
stream));
allocator->SetDefaultStream(stream);
VLOG(8) << "Set default stream to " << stream
<< " for StreamSafeXPUAllocator(" << allocator.get() << ") in "
<< place;
}
void RecordStream(std::shared_ptr<phi::Allocation> allocation,
XPUStream stream) {
std::shared_ptr<StreamSafeXPUAllocation> stream_safe_xpu_allocation =
std::dynamic_pointer_cast<StreamSafeXPUAllocation>(allocation);
if (stream_safe_xpu_allocation != nullptr) {
stream_safe_xpu_allocation->RecordStream(stream);
} else {
VLOG(6) << "RecordStream for a non-StreamSafeXPUAllocation";
}
}
XPUStream GetStream(
const std::shared_ptr<phi::Allocation>& allocation) const {
const std::shared_ptr<StreamSafeXPUAllocation> stream_safe_xpu_allocation =
std::dynamic_pointer_cast<StreamSafeXPUAllocation>(allocation);
if (stream_safe_xpu_allocation != nullptr) {
return stream_safe_xpu_allocation->GetOwningStream();
}
VLOG(6) << "GetStream for a non-StreamSafeXPUAllocation";
return static_cast<phi::XPUContext*>(
platform::DeviceContextPool::Instance().Get(allocation->place()))
->stream();
}
#endif
private:
class ZeroSizeAllocator : public Allocator {
public:
......@@ -774,6 +897,104 @@ class AllocatorFacadePrivate {
void InitNaiveBestFitXPUAllocator(platform::XPUPlace p) {
allocators_[p] = std::make_shared<NaiveBestFitAllocator>(p);
}
// Create a new XPUAllocator or XPUManagedAllocator for the given device
std::shared_ptr<Allocator> CreateXPUAllocator(platform::XPUPlace p) {
return std::make_shared<XPUAllocator>(p);
}
void InitStreamSafeXPUAllocator(platform::XPUPlace p, XPUStream stream) {
PADDLE_ENFORCE_EQ(
strategy_,
AllocatorStrategy::kAutoGrowth,
platform::errors::Unimplemented(
"Only support auto-growth strategey for StreamSafeXPUAllocator, "
"the allocator strategy %d is unsupported for multi-stream",
static_cast<int>(strategy_)));
if (LIKELY(!HasXPUAllocator(p, stream))) {
VLOG(8) << "Init XPU allocator for stream " << stream << " in place "
<< p;
InitAutoGrowthXPUAllocator(p, stream);
WrapStreamSafeXPUAllocator(p, stream);
WrapXPURetryAllocator(p, stream, FLAGS_gpu_allocator_retry_time);
WrapStatAllocator(p, stream);
}
}
void InitAutoGrowthXPUAllocator(platform::XPUPlace p, XPUStream stream) {
auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 6;
VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is "
<< FLAGS_auto_growth_chunk_size_in_mb;
auto xpu_allocator = CreateXPUAllocator(p);
auto alignment = platform::XPUMinChunkSize();
std::shared_ptr<Allocator> underlying_allocator{nullptr};
VLOG(10) << "not use AlignedAllocator with alignment: " << alignment;
underlying_allocator = xpu_allocator;
xpu_allocators_[p][stream] = std::make_shared<AutoGrowthBestFitAllocator>(
underlying_allocator, alignment, chunk_size, allow_free_idle_chunk_);
}
void InitAutoGrowthXPUAllocator(platform::XPUPlace p,
bool allow_free_idle_chunk) {
auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 6;
VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is "
<< FLAGS_auto_growth_chunk_size_in_mb;
auto xpu_allocator = CreateXPUAllocator(p);
auto alignment = platform::XPUMinChunkSize();
std::shared_ptr<Allocator> underlying_allocator{nullptr};
VLOG(10) << "not use AlignedAllocator with alignment: " << alignment;
underlying_allocator = xpu_allocator;
allocators_[p] = std::make_shared<AutoGrowthBestFitAllocator>(
underlying_allocator, alignment, chunk_size, allow_free_idle_chunk);
}
void WrapStreamSafeXPUAllocator(platform::XPUPlace p, XPUStream stream) {
std::shared_ptr<Allocator>& allocator = xpu_allocators_[p][stream];
allocator = std::make_shared<StreamSafeXPUAllocator>(allocator, p, stream);
}
void WrapStreamSafeXPUAllocatorForDefault() {
for (auto& pair : allocators_) {
auto& place = pair.first;
if (platform::is_xpu_place(place)) {
std::shared_ptr<StreamSafeXPUAllocator>&& allocator =
std::make_shared<StreamSafeXPUAllocator>(
pair.second,
place,
/* default_stream = */ nullptr);
pair.second = allocator;
default_stream_safe_xpu_allocators_[place] = allocator;
VLOG(8) << "WrapStreamSafeXPUAllocator for " << place
<< ", allocator address = " << pair.second.get();
}
}
}
void WrapXPURetryAllocator(platform::XPUPlace p,
XPUStream stream,
size_t retry_time) {
PADDLE_ENFORCE_GT(
retry_time,
0,
platform::errors::InvalidArgument(
"Retry time should be larger than 0, but got %d", retry_time));
std::shared_ptr<Allocator>& allocator = xpu_allocators_[p][stream];
allocator = std::make_shared<RetryAllocator>(allocator, retry_time);
}
void WrapStatAllocator(platform::XPUPlace p, XPUStream stream) {
std::shared_ptr<Allocator>& allocator = xpu_allocators_[p][stream];
allocator = std::make_shared<StatAllocator>(allocator);
}
#endif
#ifdef PADDLE_WITH_IPU
......@@ -807,7 +1028,7 @@ class AllocatorFacadePrivate {
int device_count = platform::GetXPUDeviceCount();
for (int i = 0; i < device_count; ++i) {
platform::XPUPlace p(i);
system_allocators_[p] = std::make_shared<NaiveBestFitAllocator>(p);
system_allocators_[p] = CreateXPUAllocator(p);
}
#endif
#ifdef PADDLE_WITH_IPU
......@@ -905,7 +1126,8 @@ class AllocatorFacadePrivate {
platform::errors::InvalidArgument(
"Retry time should be larger than 0, but got %d", retry_time));
for (auto& pair : allocators_) {
if (platform::is_gpu_place(pair.first)) {
if (platform::is_gpu_place(pair.first) ||
platform::is_xpu_place(pair.first)) {
pair.second = std::make_shared<RetryAllocator>(pair.second, retry_time);
}
}
......@@ -930,6 +1152,15 @@ class AllocatorFacadePrivate {
CUDAAllocatorMap cuda_allocators_;
std::shared_timed_mutex cuda_allocator_mutex_;
#endif
#ifdef PADDLE_WITH_XPU
// a standalone XPU allocator to support multi-stream GC in new executor
std::map<platform::Place, std::shared_ptr<StreamSafeXPUAllocator>>
default_stream_safe_xpu_allocators_;
XPUAllocatorMap xpu_allocators_;
std::shared_timed_mutex xpu_allocator_mutex_;
#endif
AllocatorStrategy strategy_;
AllocatorMap allocators_;
static AllocatorMap zero_size_allocators_;
......
......@@ -19,6 +19,9 @@
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#endif
#ifdef PADDLE_WITH_XPU
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#endif
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/core/stream.h"
......
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/memory/allocation/stream_safe_xpu_allocator.h"
#include <thread>
#include "paddle/fluid/platform/device/xpu/enforce_xpu.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
namespace paddle {
namespace memory {
namespace allocation {
StreamSafeXPUAllocation::StreamSafeXPUAllocation(
DecoratedAllocationPtr underlying_allocation,
XPUStream owning_stream,
StreamSafeXPUAllocator* allocator)
: Allocation(underlying_allocation->ptr(),
underlying_allocation->base_ptr(),
underlying_allocation->size(),
underlying_allocation->place()),
underlying_allocation_(std::move(underlying_allocation)),
owning_stream_(std::move(owning_stream)),
allocator_(allocator->shared_from_this()) {}
void StreamSafeXPUAllocation::RecordStream(XPUStream stream) {
VLOG(8) << "Try record stream " << stream << " for address " << ptr();
if (stream == owning_stream_) {
return;
}
std::call_once(once_flag_,
[this] { phi::backends::xpu::SetXPUDeviceId(place_.device); });
std::lock_guard<SpinLock> lock_guard(outstanding_event_map_lock_);
RecordStreamPrivate(stream);
}
bool StreamSafeXPUAllocation::CanBeFreed() {
std::call_once(once_flag_,
[this] { phi::backends::xpu::SetXPUDeviceId(place_.device); });
for (auto it = outstanding_event_map_.begin();
it != outstanding_event_map_.end();
++it) {
XPUEvent& event = it->second;
PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_destroy(event));
VLOG(8) << "Destroy event " << event;
}
return true;
}
XPUStream StreamSafeXPUAllocation::GetOwningStream() const {
return owning_stream_;
}
void StreamSafeXPUAllocation::RecordStreamPrivate(XPUStream stream) {
XPUEvent record_event;
auto it = outstanding_event_map_.find(stream);
if (it == outstanding_event_map_.end()) {
XPUEvent new_event;
PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_create(&new_event));
outstanding_event_map_[stream] = new_event;
record_event = new_event;
VLOG(9) << "Create a new event " << new_event;
} else {
record_event = it->second;
VLOG(9) << "Reuse event " << record_event;
}
PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_record(record_event, stream));
VLOG(8) << "Record event " << record_event << " to stream " << stream;
}
StreamSafeXPUAllocator::StreamSafeXPUAllocator(
std::shared_ptr<Allocator> underlying_allocator,
platform::XPUPlace place,
XPUStream default_stream)
: underlying_allocator_(std::move(underlying_allocator)),
place_(std::move(place)),
default_stream_(std::move(default_stream)) {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
allocator_map_[place].emplace_back(this);
}
StreamSafeXPUAllocator::~StreamSafeXPUAllocator() {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
std::vector<StreamSafeXPUAllocator*>& allocators = allocator_map_[place_];
allocators.erase(std::remove(allocators.begin(), allocators.end(), this),
allocators.end());
}
bool StreamSafeXPUAllocator::IsAllocThreadSafe() const { return true; }
XPUStream StreamSafeXPUAllocator::GetDefaultStream() const {
return default_stream_;
}
void StreamSafeXPUAllocator::SetDefaultStream(XPUStream stream) {
default_stream_ = stream;
}
phi::Allocation* StreamSafeXPUAllocator::AllocateImpl(size_t size) {
platform::RecordEvent record("StreamSafeXPUAllocator::Allocate",
platform::TracerEventType::UserDefined,
9 /*level*/);
ProcessUnfreedAllocations();
VLOG(8) << "Try allocate " << size << " bytes";
AllocationPtr underlying_allocation;
try {
underlying_allocation = underlying_allocator_->Allocate(size);
} catch (BadAlloc&) {
VLOG(4) << "Allocation failed when allocating " << size << " bytes";
ReleaseImpl(place_);
try {
underlying_allocation = underlying_allocator_->Allocate(size);
} catch (...) {
VLOG(3)
<< "Still allocation failed after release memory from all streams";
throw;
}
} catch (...) {
throw;
}
StreamSafeXPUAllocation* allocation = new StreamSafeXPUAllocation(
static_unique_ptr_cast<Allocation>(std::move(underlying_allocation)),
default_stream_,
this);
VLOG(8) << "Thread " << std::this_thread::get_id() << " Allocate "
<< allocation->size() << " bytes at address " << allocation->ptr()
<< " , stream: " << default_stream_;
return allocation;
}
void StreamSafeXPUAllocator::FreeImpl(phi::Allocation* allocation) {
platform::RecordEvent record("StreamSafeXPUAllocator::Free",
platform::TracerEventType::UserDefined,
9 /*level*/);
StreamSafeXPUAllocation* stream_safe_xpu_allocation =
static_cast<StreamSafeXPUAllocation*>(allocation);
VLOG(8) << "Try free allocation " << stream_safe_xpu_allocation->ptr();
if (stream_safe_xpu_allocation->CanBeFreed()) {
VLOG(9) << "Directly delete allocation";
delete stream_safe_xpu_allocation;
} else {
VLOG(9) << "Put into unfreed_allocation list";
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
unfreed_allocations_.emplace_back(stream_safe_xpu_allocation);
}
}
uint64_t StreamSafeXPUAllocator::ReleaseImpl(const platform::Place& place) {
std::lock_guard<SpinLock> lock_guard(allocator_map_lock_);
std::vector<StreamSafeXPUAllocator*>& allocators = allocator_map_[place];
uint64_t released_size = 0;
for (StreamSafeXPUAllocator* allocator : allocators) {
released_size += allocator->ProcessUnfreedAllocationsAndRelease();
}
VLOG(8) << "Release " << released_size << " bytes memory from all streams";
return released_size;
}
void StreamSafeXPUAllocator::ProcessUnfreedAllocations() {
// NOTE(Ruibiao): This condition is to reduce lock competion. It does not need
// to be thread-safe since here occasional misjudgments are permissible.
if (unfreed_allocations_.empty()) {
return;
}
std::lock_guard<SpinLock> lock_guard(unfreed_allocation_lock_);
for (auto it = unfreed_allocations_.begin();
it != unfreed_allocations_.end();) {
if ((*it)->CanBeFreed()) {
delete *it;
it = unfreed_allocations_.erase(it);
} else {
++it;
}
}
}
uint64_t StreamSafeXPUAllocator::ProcessUnfreedAllocationsAndRelease() {
ProcessUnfreedAllocations();
return underlying_allocator_->Release(place_);
}
thread_local std::once_flag StreamSafeXPUAllocation::once_flag_;
std::map<platform::Place, std::vector<StreamSafeXPUAllocator*>>
StreamSafeXPUAllocator::allocator_map_;
SpinLock StreamSafeXPUAllocator::allocator_map_lock_;
} // namespace allocation
} // namespace memory
} // namespace paddle
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <list>
#include <map>
#include <set>
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/phi/backends/xpu/xpu_context.h"
namespace paddle {
namespace memory {
namespace allocation {
class StreamSafeXPUAllocator;
class StreamSafeXPUAllocation : public Allocation {
public:
StreamSafeXPUAllocation(DecoratedAllocationPtr underlying_allocation,
XPUStream owning_stream,
StreamSafeXPUAllocator *allocator);
void RecordStream(XPUStream stream);
bool CanBeFreed();
XPUStream GetOwningStream() const;
private:
thread_local static std::once_flag once_flag_;
void RecordStreamPrivate(XPUStream stream);
DecoratedAllocationPtr underlying_allocation_;
std::map<XPUStream, XPUEvent> outstanding_event_map_;
XPUStream owning_stream_;
SpinLock outstanding_event_map_lock_;
std::shared_ptr<Allocator> allocator_;
};
class StreamSafeXPUAllocator
: public Allocator,
public std::enable_shared_from_this<StreamSafeXPUAllocator> {
public:
StreamSafeXPUAllocator(std::shared_ptr<Allocator> underlying_allocator,
platform::XPUPlace place,
XPUStream default_stream);
~StreamSafeXPUAllocator();
bool IsAllocThreadSafe() const override;
XPUStream GetDefaultStream() const;
void SetDefaultStream(XPUStream stream);
protected:
phi::Allocation *AllocateImpl(size_t size) override;
void FreeImpl(phi::Allocation *allocation) override;
uint64_t ReleaseImpl(const platform::Place &place) override;
private:
void ProcessUnfreedAllocations();
uint64_t ProcessUnfreedAllocationsAndRelease();
static std::map<platform::Place, std::vector<StreamSafeXPUAllocator *>>
allocator_map_;
static SpinLock allocator_map_lock_;
std::shared_ptr<Allocator> underlying_allocator_;
platform::XPUPlace place_;
XPUStream default_stream_;
std::list<StreamSafeXPUAllocation *> unfreed_allocations_;
SpinLock unfreed_allocation_lock_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/memory/allocation/xpu_allocator.h"
#include <string>
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace memory {
namespace allocation {
bool XPUAllocator::IsAllocThreadSafe() const { return true; }
void XPUAllocator::FreeImpl(phi::Allocation* allocation) {
PADDLE_ENFORCE_EQ(
allocation->place(),
place_,
platform::errors::PermissionDenied(
"XPU memory is freed in incorrect device. This may be a bug"));
platform::RecordedXPUFree(
allocation->ptr(), allocation->size(), place_.device);
delete allocation;
}
phi::Allocation* XPUAllocator::AllocateImpl(size_t size) {
std::call_once(once_flag_,
[this] { platform::SetXPUDeviceId(place_.device); });
void* ptr;
auto result = platform::RecordedXPUMalloc(&ptr, size, place_.device);
if (LIKELY(result == XPU_SUCCESS)) {
return new Allocation(ptr, size, platform::Place(place_));
}
PADDLE_THROW_BAD_ALLOC(platform::errors::ResourceExhausted(
"\n\nOut of memory error on XPU %d. "
"Cannot allocate %s memory on XPU %d.\n\n",
place_.device,
string::HumanReadableSize(size),
place_.device));
}
} // namespace allocation
} // namespace memory
} // namespace paddle
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <mutex> // NOLINT
#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace memory {
namespace allocation {
class XPUAllocator : public Allocator {
public:
explicit XPUAllocator(const platform::XPUPlace& place) : place_(place) {}
bool IsAllocThreadSafe() const override;
protected:
void FreeImpl(phi::Allocation* allocation) override;
phi::Allocation* AllocateImpl(size_t size) override;
private:
platform::XPUPlace place_;
std::once_flag once_flag_;
};
} // namespace allocation
} // namespace memory
} // namespace paddle
......@@ -19,6 +19,8 @@ limitations under the License. */
#include "paddle/fluid/platform/device/xpu/enforce_xpu.h"
#include "paddle/fluid/platform/device/xpu/xpu_header.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/lock_guard_ptr.h"
#include "paddle/fluid/platform/monitor.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
......@@ -92,5 +94,127 @@ phi::backends::xpu::XPUVersion get_xpu_version(int dev_id) {
return phi::backends::xpu::get_xpu_version(dev_id);
}
/**************************** XPU Allocator **************************/
size_t XPUMinChunkSize() { return 1 << 6; }
static void RaiseNonOutOfMemoryError(int status) {
if (status == XPUERR_NOMEM) {
status = XPU_SUCCESS;
}
PADDLE_ENFORCE_XRE_SUCCESS(status);
}
class RecordedXPUMallocHelper {
private:
explicit RecordedXPUMallocHelper(int dev_id, uint64_t limit_size = 0)
: dev_id_(dev_id), limit_size_(limit_size) {
if (NeedRecord()) {
mtx_.reset(new std::mutex());
}
}
DISABLE_COPY_AND_ASSIGN(RecordedXPUMallocHelper);
public:
static RecordedXPUMallocHelper* Instance(int dev_id) {
std::call_once(once_flag_, [] {
int dev_cnt = GetXPUDeviceCount();
instances_.reserve(dev_cnt);
for (int i = 0; i < dev_cnt; ++i) {
// NOTE(zhiqiu): share the flags with gpu, avoid more flags.
instances_.emplace_back(new RecordedXPUMallocHelper(i, 0UL << 20));
}
});
PADDLE_ENFORCE_GE(
dev_id,
0,
phi::errors::OutOfRange(
"Device id must be not less than 0, but got %d.", dev_id));
PADDLE_ENFORCE_LT(
dev_id,
instances_.size(),
phi::errors::OutOfRange("Device id %d exceeds XPU card number %d.",
dev_id,
instances_.size()));
return instances_[dev_id].get();
}
/**
* Try to allocate `size` XPU memory. Only XPUERR_NOMEM
* or XPU_SUCCESS would be returned.
*/
int Malloc(void** ptr, size_t size) {
LockGuardPtr<std::mutex> lock(mtx_);
if (UNLIKELY(NeedRecord() && cur_size_.load() + size > limit_size_)) {
return XPUERR_NOMEM;
}
XPUDeviceGuard guard(dev_id_);
VLOG(10) << "Allocate " << size << " bytes with ptr = " << &(ptr);
auto result = xpu_malloc(ptr, size);
if (result == XPU_SUCCESS) {
cur_size_.fetch_add(size);
return result;
} else {
RaiseNonOutOfMemoryError(result);
// Non out of memory error would be raised inside
// RaiseNonOutOfMemoryError. Therefore, we can
// return XPUERR_NOMEM directly here.
return XPUERR_NOMEM;
}
}
/**
* Free XPU memory. Usually, free is not allowed to raise error.
* If it does raise error, the process should be crashed.
*/
void Free(void* ptr, size_t size) {
XPUDeviceGuard guard(dev_id_);
xpu_free(ptr);
cur_size_.fetch_sub(size);
}
inline bool NeedRecord() const { return limit_size_ != 0; }
uint64_t RecordedSize() const { return cur_size_.load(); }
uint64_t LimitSize() const { return limit_size_; }
private:
const int dev_id_;
const uint64_t limit_size_;
std::atomic<uint64_t> cur_size_{0};
mutable std::unique_ptr<std::mutex> mtx_;
static std::once_flag once_flag_;
static std::vector<std::unique_ptr<RecordedXPUMallocHelper>> instances_;
};
std::once_flag RecordedXPUMallocHelper::once_flag_;
std::vector<std::unique_ptr<RecordedXPUMallocHelper>>
RecordedXPUMallocHelper::instances_;
int RecordedXPUMalloc(void** ptr, size_t size, int dev_id) {
return RecordedXPUMallocHelper::Instance(dev_id)->Malloc(ptr, size);
}
void RecordedXPUFree(void* p, size_t size, int dev_id) {
return RecordedXPUMallocHelper::Instance(dev_id)->Free(p, size);
}
uint64_t RecordedXPUMallocSize(int dev_id) {
return RecordedXPUMallocHelper::Instance(dev_id)->RecordedSize();
}
uint64_t RecordedXPULimitSize(int dev_id) {
return RecordedXPUMallocHelper::Instance(dev_id)->LimitSize();
}
bool IsXPUMallocRecorded(int dev_id) {
return RecordedXPUMallocHelper::Instance(dev_id)->NeedRecord();
}
} // namespace platform
} // namespace paddle
......@@ -70,6 +70,22 @@ using XPUDeviceGuard = phi::backends::xpu::XPUDeviceGuard;
phi::backends::xpu::XPUVersion get_xpu_version(int dev_id);
//! Get the minimum chunk size for XPU allocator.
size_t XPUMinChunkSize();
//! xpu_malloc with recorded info
int RecordedXPUMalloc(void **ptr, size_t size, int dev_id);
//! xpu_free with recorded info
void RecordedXPUFree(void *p, size_t size, int dev_id);
//! Get recorded actrtMalloc size. If record is disabled, return 0.
uint64_t RecordedXPUMallocSize(int dev_id);
uint64_t RecordedXPULimitSize(int dev_id);
bool IsXPUMallocRecorded(int dev_id);
} // namespace platform
} // namespace paddle
#endif
......@@ -21,6 +21,7 @@ limitations under the License. */
#include "paddle/phi/core/errors.h"
#include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
namespace phi {
......@@ -38,7 +39,7 @@ inline size_t Alignment(size_t size,
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
alignment = phi::backends::gpu::GpuMinChunkSize();
#elif defined(PADDLE_WITH_XPU)
alignment = alignment;
alignment = phi::backends::xpu::XPUMinChunkSize();
#else
PADDLE_THROW(phi::errors::PreconditionNotMet(
"Fluid is not compiled with CUDA/XPU."));
......
......@@ -206,6 +206,16 @@ DEFINE_EXTERNAL_API_TYPE(BKCLResult_t, BKCL_SUCCESS);
__THROW_ERROR_INTERNAL__(__summary__); \
} \
} while (0)
#define PADDLE_ENFORCE_XRE_SUCCESS(COND) \
do { \
auto __cond__ = (COND); \
auto xre_msg = xpu_strerror(__cond__); \
if (UNLIKELY(__cond__ != XPU_SUCCESS)) { \
auto __summary__ = \
phi::errors::External("XPU Runtime Error: ", xre_msg); \
__THROW_ERROR_INTERNAL__(__summary__); \
} \
} while (0)
} // namespace xpu
} // namespace backends
......
......@@ -59,22 +59,24 @@ struct XPUContext::Impl {
}
}
bool IsDataloader() const {
if (std::getenv("XPU_PADDLE_XDL_CONTEXTS") == nullptr) {
return false;
}
std::string cur_thread_name = phi::GetCurrentThreadName();
VLOG(3) << "XPU Dataloader: current thread at Get Context = "
<< phi::GetCurrentThreadName();
bool is_dataloader_thread = (cur_thread_name != "MainThread");
return is_dataloader_thread;
}
Impl() : place_(XPUPlace()) {}
explicit Impl(const Place& place) : place_(place) {}
~Impl() {
for (auto& ctx_it : context_map_) {
auto& ctx = ctx_it.second;
if (ctx != nullptr) {
xpu_wait(ctx->xpu_stream);
if (ctx->xpu_stream) {
xpu_stream_destroy(ctx->xpu_stream);
ctx->xpu_stream = nullptr;
}
ctx = nullptr;
}
}
context_map_.clear();
if (owned_ && context_ != nullptr) {
backends::xpu::XPUDeviceGuard guard(place_.GetDeviceId());
xpu_wait(context_->xpu_stream);
......@@ -87,27 +89,13 @@ struct XPUContext::Impl {
xpu::destroy_context(context_);
context_ = nullptr;
}
if (std::getenv("XPU_PADDLE_XDL_CONTEXTS") != nullptr) {
// destroy all XPU Dataloader threads if exist
backends::xpu::XPUDeviceGuard guard(place_.GetDeviceId());
for (auto ctx : GetAllXdlCtxs()) {
xpu_wait(ctx->xpu_stream);
if (ctx->xpu_stream) {
xpu_stream_destroy(ctx->xpu_stream);
ctx->xpu_stream = nullptr;
}
xpu::destroy_context(ctx);
ctx = nullptr;
}
xdl_context_map_.clear();
}
}
const Place& GetPlace() const { return place_; }
XPUStream stream() const {
if (IsDataloader()) {
xpu::Context* ctx_t = GetXdlCtx();
xpu::Context* ctx_t = GetXdlCtx();
if (ctx_t) {
return ctx_t->xpu_stream;
}
return context_->xpu_stream;
......@@ -132,10 +120,10 @@ struct XPUContext::Impl {
// Overload GetXContext function to set and get
// contexts of XPU Dataloader threads, and keep old GetXContext Method
xpu::Context* GetXContext() {
if (IsDataloader()) {
SetXdlCtx();
xpu::Context* ctx_t = GetXdlCtx();
PD_CHECK(ctx_t != nullptr, "the xpu dataloader context is nullptr.");
SetXdlCtx();
xpu::Context* ctx_t = GetXdlCtx();
if (ctx_t) {
PD_CHECK(ctx_t != nullptr, "the xpu context is nullptr.");
return ctx_t;
}
......@@ -144,21 +132,12 @@ struct XPUContext::Impl {
}
void Wait() const {
if (IsDataloader()) {
xpu::Context* ctx_t = GetXdlCtx();
if (ctx_t) {
PD_CHECK(ctx_t != nullptr, "the xpu dataloader context is nullptr.");
xpu_wait(ctx_t->xpu_stream);
}
return;
}
backends::xpu::XPUDeviceGuard guard(place_.GetDeviceId());
PD_CHECK(context_ != nullptr, "the xpu context is nullptr.");
xpu_wait(context_->xpu_stream);
xpu::Context* ctx_t = GetXdlCtx();
if (ctx_t) {
PD_CHECK(ctx_t != nullptr, "the xpu dataloader context is nullptr.");
PD_CHECK(ctx_t != nullptr, "the xpu context is nullptr.");
xpu_wait(ctx_t->xpu_stream);
}
}
......@@ -169,10 +148,6 @@ struct XPUContext::Impl {
LOG_FIRST_N(WARNING, 1)
<< "Please NOTE: xpu device: " << static_cast<int>(place_.device);
context_ = xpu::create_context();
if (std::getenv("XPU_PADDLE_XDL_CONTEXTS") != nullptr) {
// Initialize XPU Dataloader threads contexts map
InitializeXdlContexts();
}
xpu_version_ = backends::xpu::get_xpu_version(place_.device);
SetL3Cache();
}
......@@ -190,43 +165,29 @@ struct XPUContext::Impl {
stream_owned_ = true;
}
// Methods of XPU Dataloader threads contexts map,
// currently, need set 'export XPU_PADDLE_XDL_CONTEXTS=1'
// to open XPU Dataloader context map
void InitializeXdlContexts() {
if (std::getenv("XPU_PADDLE_XDL_CONTEXTS") == nullptr) {
return;
}
auto thread_map = phi::GetAllThreadNames();
for (const auto& tp : thread_map) {
std::string t_name = tp.second;
if (t_name.substr(0, 10) == "Dataloader") {
SetXdlCtx();
}
}
}
void SetXdlCtx() {
auto pid = phi::GetProcessId();
if (xdl_context_map_.find(pid) == xdl_context_map_.end()) {
std::string tname = phi::GetCurrentThreadName();
if (tname.substr(0, 10) == "Dataloader" &&
context_map_.find(tname) == context_map_.end()) {
VLOG(4) << "Set XPU Dataloader Context with current thread name = "
<< tname << " currently " << context_map_.size()
<< " contexts existing";
xpu::Context* ctx_t = xpu::create_context();
xdl_context_map_[pid] = ctx_t;
context_map_[tname] = ctx_t;
}
}
xpu::Context* GetXdlCtx() const {
auto pid = phi::GetProcessId();
return (xdl_context_map_.find(pid) == xdl_context_map_.end())
? nullptr
: xdl_context_map_.find(pid)->second;
}
std::vector<xpu::Context*> GetAllXdlCtxs() {
std::vector<xpu::Context*> ctxs;
for (const auto& it : xdl_context_map_) {
ctxs.emplace_back(it.second);
std::string tname = phi::GetCurrentThreadName();
VLOG(4) << "Get XPU Context with current thread name = " << tname
<< " currently " << context_map_.size() << " contexts existing";
if (tname.substr(0, 10) != "Dataloader") {
return context_;
} else {
return (context_map_.find(tname) == context_map_.end())
? nullptr
: context_map_.find(tname)->second;
}
return ctxs;
}
bool owned_{false};
......@@ -236,7 +197,7 @@ struct XPUContext::Impl {
int runtime_version_;
int driver_version_;
xpu::Context* context_{nullptr};
std::unordered_map<uint32_t, xpu::Context*> xdl_context_map_;
std::unordered_map<std::string, xpu::Context*> context_map_;
// NOTE: Distributed communicator, distributed framework manages its
// resources, XPUContext only holds references.
......@@ -290,8 +251,6 @@ void XPUContext::SetXContext(xpu::Context* context) {
void XPUContext::SetL3Cache(int l3_size) { impl_->SetL3Cache(l3_size); }
bool XPUContext::IsDataloader() const { return impl_->IsDataloader(); }
void XPUContext::SetBkclContext(xpu::BKCLContext_t context) {
impl_->SetBkclContext(context);
}
......
......@@ -47,10 +47,6 @@ class XPUContext : public DeviceContext,
xpu::Context* x_context() const;
// For multi-thread dataloader,
// check if the current thread is Dataloader thread
bool IsDataloader() const;
// Return bkcl context.
xpu::BKCLContext_t bkcl_context() const;
void SetBkclContext(xpu::BKCLContext_t context);
......
......@@ -51,4 +51,22 @@ class XPUTypeTrait<phi::dtype::bfloat16> {
using Type = bfloat16;
};
template <typename T>
class XPUTypeToPhiType {
public:
using Type = T;
};
template <>
class XPUTypeToPhiType<float16> {
public:
using Type = phi::dtype::float16;
};
template <>
class XPUTypeToPhiType<bfloat16> {
public:
using Type = phi::dtype::bfloat16;
};
#endif
......@@ -45,6 +45,11 @@ int GetXPUCurrentDeviceId();
std::vector<int> GetXPUSelectedDevices();
/***** Memory Management *****/
//! Get the minimum chunk size for XPU buddy allocator.
inline size_t XPUMinChunkSize() {
// Allow to allocate the minimum chunk size is 64 bytes.
return 1 << 6;
}
//! Copy memory from address src to dst synchronously.
void MemcpySyncH2D(void *dst,
......
......@@ -542,7 +542,7 @@ PHI_DEFINE_EXPORTED_double(
// NOTE(zhiqiu): better to share the flags, otherwise we will have too many
// flags.
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) || \
defined(PADDLE_WITH_CUSTOM_DEVICE)
defined(PADDLE_WITH_CUSTOM_DEVICE) || defined(PADDLE_WITH_XPU)
/**
* Memory related FLAG
......
......@@ -17,6 +17,7 @@
#include "paddle/phi/backends/xpu/enforce_xpu.h"
#include "paddle/phi/core/kernel_registry.h"
#include "paddle/phi/kernels/funcs/math_function.h"
#include "paddle/phi/kernels/xpu/xpu_mem_util.h"
namespace phi {
template <typename T, typename Context>
......@@ -59,8 +60,9 @@ void TopkKernel(const Context& dev_ctx,
size_t k = k_scalar.to<int>();
if (axis + 1 == in_dims.size()) {
xpu::ctx_guard RAII_GUARD(dev_ctx.x_context());
int32_t* indices_int_data =
RAII_GUARD.alloc_l3_or_gm<int32_t>(indices->numel());
int32_t* indices_int_data = Alloc_l3_or_gm<Context, int32_t>(
dev_ctx, &RAII_GUARD, indices->numel());
PADDLE_ENFORCE_XDNN_NOT_NULL(indices_int_data);
const size_t row =
phi::product(phi::slice_ddim(in_dims, 0, in_dims.size() - 1));
......@@ -104,7 +106,9 @@ void TopkKernel(const Context& dev_ctx,
}
xpu::ctx_guard RAII_GUARD(dev_ctx.x_context());
XPUType* trans_in_data = RAII_GUARD.alloc_l3_or_gm<XPUType>(x.numel());
XPUType* trans_in_data =
Alloc_l3_or_gm<Context, XPUType>(dev_ctx, &RAII_GUARD, x.numel());
PADDLE_ENFORCE_XDNN_NOT_NULL(trans_in_data);
// Transpose and save interval output to trans_in
int r = xpu::transpose<XPUType>(dev_ctx.x_context(),
......@@ -119,10 +123,17 @@ void TopkKernel(const Context& dev_ctx,
r,
XPUAPIErrorMsg[r]));
XPUType* trans_out_data = RAII_GUARD.alloc_l3_or_gm<XPUType>(out->numel());
int64_t* trans_idx_data = RAII_GUARD.alloc_l3_or_gm<int64_t>(out->numel());
XPUType* trans_out_data =
Alloc_l3_or_gm<Context, XPUType>(dev_ctx, &RAII_GUARD, out->numel());
PADDLE_ENFORCE_XDNN_NOT_NULL(trans_out_data);
int64_t* trans_idx_data =
Alloc_l3_or_gm<Context, int64_t>(dev_ctx, &RAII_GUARD, out->numel());
PADDLE_ENFORCE_XDNN_NOT_NULL(trans_idx_data);
int32_t* trans_idx_int32_data =
RAII_GUARD.alloc_l3_or_gm<int32_t>(out->numel());
Alloc_l3_or_gm<Context, int32_t>(dev_ctx, &RAII_GUARD, out->numel());
PADDLE_ENFORCE_XDNN_NOT_NULL(trans_idx_int32_data);
const size_t row =
phi::product(phi::slice_ddim(trans_dims, 0, trans_dims.size() - 1));
const size_t col = trans_dims[trans_dims.size() - 1];
......
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef PADDLE_WITH_XPU
#include <vector>
#include "paddle/phi/backends/xpu/enforce_xpu.h"
#include "paddle/phi/backends/xpu/xpu_header.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
#include "paddle/phi/core/dense_tensor.h"
#include "paddle/phi/core/device_context.h"
namespace phi {
template <typename T>
T* Alloc_l3(xpu::ctx_guard* RAII_GUARD, const int64_t n) {
T* ret = RAII_GUARD->alloc_l3<T>(n);
return ret;
}
template <typename T, typename Context>
T* Alloc_gm(const Context& dev_ctx, const int64_t n) {
DenseTensor ret_tensor;
DDim d({n});
ret_tensor.Resize(d);
T* ret = dev_ctx.template Alloc<T>(&ret_tensor);
return ret;
}
template <typename Context, typename XPUT>
XPUT* Alloc_l3_or_gm(const Context& dev_ctx,
xpu::ctx_guard* RAII_GUARD,
const int64_t n) {
XPUT* ret = Alloc_l3<XPUT>(RAII_GUARD, n);
if (ret != nullptr) {
return ret;
}
using T = typename XPUTypeToPhiType<XPUT>::Type;
ret = reinterpret_cast<XPUT*>(Alloc_gm<T, Context>(dev_ctx, n));
return ret;
}
} // namespace phi
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册