未验证 提交 99399f32 编写于 作者: C csy0225 提交者: GitHub

XPU Support external stream (#53334)

上级 eda8df71
......@@ -334,6 +334,26 @@ bool AnalysisPredictor::Init(
InitDeviceContexts();
}
}
#endif
#if defined(PADDLE_WITH_XPU)
if (config_.use_xpu_ && config_.use_external_stream_) {
private_context_ = true;
}
if (private_context_) {
if (!status_is_cloned_) {
predictor_stream_ = config_.GetExecStream();
}
// NOTE: If the external_stream equals to global_device_contexts's stream,
// then fallback.
auto global_stream =
static_cast<phi::XPUContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
if (predictor_stream_ != global_stream) {
InitResourceManager(predictor_stream_);
InitDeviceContexts();
}
}
#endif
inference::DisplayMemoryInfo(place_, "Init predictor");
return true;
......@@ -418,6 +438,9 @@ void AnalysisPredictor::InitResourceManager(void *stream) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
predictor_stream_ =
ResourceManager::Instance().InitGPUResource(place_, stream);
#elif defined(PADDLE_WITH_XPU)
predictor_stream_ =
ResourceManager::Instance().InitXPUResource(place_, stream);
#endif
}
......@@ -487,6 +510,32 @@ void AnalysisPredictor::InitDeviceContexts() {
return std::unique_ptr<phi::DeviceContext>(gpu_context);
}));
}
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
device_contexts_.emplace(
place_, std::async(std::launch::deferred, [=] {
auto *xpu_resource =
ResourceManager::Instance().GetXPUResource(predictor_stream_);
auto &instance = memory::allocation::AllocatorFacade::Instance();
auto *xpu_context = new InferXPUContext(place_);
xpu_context->SetAllocator(instance.GetAllocator(place_).get());
xpu_context->SetGenerator(
phi::DefaultXPUGenerator(place_.GetDeviceId()).get());
xpu_context->SetHostAllocator(
instance.GetAllocator(platform::CPUPlace()).get());
xpu_context->SetHostGenerator(phi::DefaultCPUGenerator().get());
xpu_context->SetZeroAllocator(
instance.GetZeroAllocator(place_).get());
xpu_context->SetHostZeroAllocator(
instance.GetZeroAllocator(platform::CPUPlace()).get());
xpu_context->SetStream(xpu_resource->GetStream());
xpu_context->SetDriverVersion(xpu_resource->GetDriverVersion());
xpu_context->SetRuntimeVersion(xpu_resource->GetRuntimeVersion());
xpu_context->SetXpuVersion(xpu_resource->GetXpuVersion());
return std::unique_ptr<phi::DeviceContext>(xpu_context);
}));
}
#endif
// TODO(Inference): Support other backends.
}
......@@ -506,10 +555,14 @@ void *AnalysisPredictor::GetExecStream() const {
#endif
#if defined(PADDLE_WITH_XPU)
if (place_.GetType() == phi::AllocationType::XPU) {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
if (private_context_) {
return predictor_stream_;
} else {
paddle::platform::DeviceContextPool &pool =
paddle::platform::DeviceContextPool::Instance();
return reinterpret_cast<const phi::XPUContext *>(pool.Get(place_))
->stream();
}
}
#endif
// TODO(inference): Support other backends.
......@@ -2050,6 +2103,33 @@ bool AnalysisPredictor::ExpRunWithExternalStream(const gpuStream_t stream) {
}
#endif
bool AnalysisPredictor::ExpRunWithExternalStream(void *stream) {
#if defined(PADDLE_WITH_XPU)
if (!private_context_) {
PADDLE_THROW(platform::errors::Fatal(
"Please use config.SetExecStream to init resources, and then we "
"will bind resources to execution stream."));
}
if (stream != predictor_stream_) {
paddle::platform::XPUStreamSync(
static_cast<paddle::xpuStream>(predictor_stream_));
ResourceManager::Instance().XpuResourceReBindStream(predictor_stream_,
stream);
predictor_stream_ = stream;
auto *dev_ctxs = reinterpret_cast<const std::map<
phi::Place,
std::shared_future<std::unique_ptr<phi::DeviceContext>>> *>(
this->GetDeviceContexts());
auto *dev_ctx =
static_cast<InferXPUContext *>(dev_ctxs->at(place_).get().get());
dev_ctx->SetStream(stream);
}
return ZeroCopyRun();
#endif
return false;
}
void AnalysisPredictor::CollectShapeRangeInfo() {
// if use gpu, sync first.
paddle::platform::DeviceContextPool &pool =
......@@ -2413,7 +2493,12 @@ AnalysisPredictor::~AnalysisPredictor() {
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyGPUResource(predictor_stream_);
}
#elif defined(PADDLE_WITH_XPU)
if (predictor_stream_ != nullptr) {
ResourceManager::Instance().DestroyXPUResource(predictor_stream_);
}
#endif
if (place_.GetType() != phi::AllocationType::UNDEFINED) {
memory::Release(place_);
}
......@@ -2922,6 +3007,11 @@ bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
#endif
return false;
}
bool InternalUtils::RunWithExternalStream(paddle_infer::Predictor *p,
void *stream) {
auto pred = dynamic_cast<paddle::AnalysisPredictor *>(p->predictor_.get());
return pred->ExpRunWithExternalStream(stream);
}
void InternalUtils::UpdateConfigInterleaved(paddle_infer::Config *c,
bool with_interleaved) {
......
......@@ -225,6 +225,9 @@ class AnalysisPredictor : public PaddlePredictor {
bool ExpRunWithExternalStream(const gpuStream_t stream);
#endif
// Note: Can only be used under thread_local semantics.
bool ExpRunWithExternalStream(void *stream);
///
/// \brief Get the execution stream on devices with a concept of stream,
/// otherwise returns nullptr.
......
......@@ -22,4 +22,9 @@ InferGPUContext::InferGPUContext(const phi::Place& place)
: phi::GPUContext(place, false) {}
#endif
#if defined(PADDLE_WITH_XPU)
InferXPUContext::InferXPUContext(const phi::Place& place)
: phi::XPUContext(place) {}
#endif
} // namespace paddle
......@@ -45,4 +45,15 @@ class InferGPUContext : public phi::GPUContext {
using phi::GPUContext::SetRuntimeVersion;
};
#endif
#if defined(PADDLE_WITH_XPU)
class InferXPUContext : public phi::XPUContext {
public:
explicit InferXPUContext(const phi::Place& place);
using phi::XPUContext::SetDriverVersion;
using phi::XPUContext::SetRuntimeVersion;
using phi::XPUContext::SetStream;
using phi::XPUContext::SetXpuVersion;
};
#endif
} // namespace paddle
......@@ -480,7 +480,8 @@ class PD_INFER_DECL InternalUtils {
cudaStream_t stream);
static bool RunWithExternalStream(paddle_infer::Predictor* pred,
hipStream_t stream);
static bool RunWithExternalStream(paddle_infer::Predictor* pred,
void* stream);
static void UpdateConfigInterleaved(paddle_infer::Config* c,
bool with_interleaved);
......
......@@ -41,6 +41,9 @@
#include "paddle/phi/backends/dynload/cusparse.h"
#endif // PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_XPU
#include "paddle/phi/backends/xpu/xpu_info.h"
#endif
namespace paddle {
namespace internal {
......@@ -545,4 +548,121 @@ int ResourceManager::RefCount(void* stream) const {
}
#endif
#if defined(PADDLE_WITH_XPU)
// XPUContextResource
XPUContextResource::XPUContextResource(const phi::Place& place, void* stream)
: place_(place) {
InitXPUResource(stream);
}
XPUContextResource::~XPUContextResource() {}
void XPUContextResource::InitXPUResource(void* stream) {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
if (stream) {
owned_stream_ = false;
stream_ = stream;
}
InitXpuProperties();
}
void XPUContextResource::InitXpuProperties() {
phi::backends::xpu::XPUDeviceGuard guard(place_.device);
driver_version_ = phi::backends::xpu::GetDriverVersion();
runtime_version_ = phi::backends::xpu::GetRuntimeVersion();
xpu_version_ =
static_cast<int>(phi::backends::xpu::get_xpu_version(place_.device));
}
void* XPUContextResource::GetStream() const { return stream_; }
int XPUContextResource::GetDriverVersion() const { return driver_version_; }
int XPUContextResource::GetRuntimeVersion() const { return runtime_version_; }
int XPUContextResource::GetXpuVersion() const { return xpu_version_; }
void XPUContextResource::ReBindStream(void* stream) {
owned_stream_ = false;
stream_ = stream;
}
// XPUContextResource End.
// Resource Manager
void* ResourceManager::InitXPUResource(const phi::Place& place, void* stream) {
std::lock_guard<std::mutex> lock_gurad(xpu_mutex_);
if (xpu_resources_.count(stream)) {
Increase(stream);
return stream;
} else {
std::unique_ptr<XPUContextResource> resource{
new XPUContextResource(place, stream)};
void* s = resource->GetStream();
ref_count_[s] = 1;
xpu_resources_.emplace(s, std::move(resource));
return s;
}
}
XPUContextResource* ResourceManager::GetXPUResource(void* stream) const {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
return xpu_resources_.at(stream).get();
}
void ResourceManager::XpuResourceReBindStream(void* old_stream,
void* new_stream) {
PADDLE_ENFORCE_EQ(
xpu_resources_.count(old_stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", old_stream));
auto xpu_resource = std::move(xpu_resources_.at(old_stream));
DestroyXPUResource(old_stream);
PADDLE_ENFORCE_EQ(
ref_count_.count(old_stream),
0,
platform::errors::Fatal("xpu resources rebind stream failed."));
xpu_resource->ReBindStream(new_stream);
ref_count_[new_stream]++;
xpu_resources_.emplace(new_stream, std::move(xpu_resource));
}
void ResourceManager::DestroyXPUResource(void* stream) {
PADDLE_ENFORCE_EQ(xpu_resources_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in xpu_resources.", stream));
Decrease(stream);
}
void ResourceManager::Decrease(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
--ref_count_[stream];
if (ref_count_[stream] == 0) {
ref_count_.erase(stream);
xpu_resources_.erase(stream);
}
}
void ResourceManager::Increase(void* stream) {
PADDLE_ENFORCE_EQ(ref_count_.count(stream),
true,
platform::errors::InvalidArgument(
"The stream[%p] not found in ref_count.", stream));
++ref_count_[stream];
}
int ResourceManager::RefCount(void* stream) const {
if (ref_count_.count(stream) == 0) return 0;
return ref_count_.at(stream);
}
// Resource Manager End.
#endif
} // namespace paddle
......@@ -134,6 +134,33 @@ class GPUContextResource {
};
#endif
#if defined(PADDLE_WITH_XPU)
class XPUContextResource {
public:
explicit XPUContextResource(const phi::Place& place, void* stream);
~XPUContextResource();
phi::Place Place() const;
void* GetStream() const;
int GetDriverVersion() const;
int GetRuntimeVersion() const;
int GetXpuVersion() const;
void ReBindStream(void* stream);
private:
void InitXPUResource(void* stream);
void InitXpuProperties();
private:
bool owned_stream_{true};
void* stream_;
phi::Place place_;
int driver_version_;
int runtime_version_;
int xpu_version_;
}; // class XPUContextResource
#endif
class ResourceManager {
public:
ResourceManager() = default;
......@@ -173,6 +200,28 @@ class ResourceManager {
gpu_resources_;
#endif
// XPU Resource
#if defined(PADDLE_WITH_XPU)
public:
void* InitXPUResource(const phi::Place& place, void* stream);
void DestroyXPUResource(void* stream);
XPUContextResource* GetXPUResource(void* stream) const;
int RefCount(void* stream) const;
void XpuResourceReBindStream(void* old_stream, void* new_stream);
private:
void Decrease(void* stream);
void Increase(void* stream);
private:
std::mutex xpu_mutex_;
// a stream corresponding to a series of resource.
std::map<void* /*stream*/, std::atomic<int>> ref_count_;
std::map<void* /*stream*/, std::unique_ptr<XPUContextResource>>
xpu_resources_;
#endif
private:
DISABLE_COPY_AND_ASSIGN(ResourceManager);
};
......
......@@ -76,7 +76,7 @@ struct XPUContext::Impl {
if (owned_ && context_ != nullptr) {
backends::xpu::XPUDeviceGuard guard(place_.GetDeviceId());
xpu_wait(context_->xpu_stream);
if (context_->xpu_stream) {
if (context_->xpu_stream && stream_owned_) {
// manually destroy XPUStream here until xpu::api integrates this work
// into Context dtor
xpu_stream_destroy(context_->xpu_stream);
......@@ -111,6 +111,12 @@ struct XPUContext::Impl {
return context_->xpu_stream;
}
// Set external stream for context
void SetStream(void* stream) {
stream_owned_ = false;
context_->set_stream(static_cast<XPUStream>(stream));
}
xpu::Context* GetXContext() const {
PD_CHECK(context_ != nullptr, "the xpu context is nullptr.");
return context_;
......@@ -179,6 +185,7 @@ struct XPUContext::Impl {
return;
}
PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(&context_->xpu_stream));
stream_owned_ = true;
}
// Methods of XPU Dataloader threads contexts map,
......@@ -221,8 +228,11 @@ struct XPUContext::Impl {
}
bool owned_{false};
bool stream_owned_{false};
Place place_;
backends::xpu::XPUVersion xpu_version_;
int runtime_version_;
int driver_version_;
xpu::Context* context_{nullptr};
std::unordered_map<uint32_t, xpu::Context*> xdl_context_map_;
......@@ -246,6 +256,20 @@ const Place& XPUContext::GetPlace() const { return impl_->GetPlace(); }
XPUStream XPUContext::stream() const { return impl_->stream(); }
void XPUContext::SetStream(void* stream) { impl_->SetStream(stream); }
void XPUContext::SetXpuVersion(int version) {
impl_->xpu_version_ = static_cast<backends::xpu::XPUVersion>(version);
}
void XPUContext::SetRuntimeVersion(int version) {
impl_->runtime_version_ = version;
}
void XPUContext::SetDriverVersion(int version) {
impl_->driver_version_ = version;
}
backends::xpu::XPUVersion XPUContext::xpu_version() const {
return impl_->xpu_version_;
}
......
......@@ -56,6 +56,9 @@ class XPUContext : public DeviceContext,
void SetBkclContext(xpu::BKCLContext_t context);
void CreateStream();
// For share external stream.
void SetStream(void* stream);
// Wait for all operations completion in the stream.
void Wait() const override;
......@@ -73,6 +76,12 @@ class XPUContext : public DeviceContext,
void SetL3Cache(int l3_size = 14155776);
void SetXpuVersion(int version);
void SetRuntimeVersion(int runtime_version);
void SetDriverVersion(int driver_version);
Eigen::DefaultDevice* eigen_device() const { return nullptr; }
XPUStream stream() const;
......
......@@ -17,6 +17,10 @@
#if defined(PADDLE_WITH_CUDA)
#include <cuda_runtime.h>
#endif
#if defined(PADDLE_WITH_XPU)
#include "xpu/runtime.h"
#include "xpu/xdnn.h"
#endif
#include <glog/logging.h>
#include <gtest/gtest.h>
......@@ -654,6 +658,57 @@ TEST(Predictor, Streams) {
}
#endif
#if defined(PADDLE_WITH_XPU)
TEST(Predictor, XPUStreams) {
// external stream
{
auto context = baidu::xpu::api::create_context();
xpu_stream_create(&context->xpu_stream);
Config config;
config.SetModel(FLAGS_dirname);
config.EnableXpu();
config.SetExecStream(static_cast<void*>(context->xpu_stream));
CHECK_EQ(config.external_stream_enabled(), true);
auto predictor = CreatePredictor(config);
auto stream = predictor->GetExecStream();
CHECK_EQ(static_cast<void*>(context->xpu_stream), stream);
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream), 1);
}
// 2 predictor on 2 stream
{
auto context1 = baidu::xpu::api::create_context();
xpu_stream_create(&context1->xpu_stream);
Config config;
config.SetModel(FLAGS_dirname);
config.EnableXpu();
config.SetExecStream(static_cast<void*>(context1->xpu_stream));
auto predictor = CreatePredictor(config);
auto stream1 = predictor->GetExecStream();
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream1));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream1), 1);
auto context2 = baidu::xpu::api::create_context();
xpu_stream_create(&context2->xpu_stream);
Config config2;
config2.SetModel(FLAGS_dirname);
config2.EnableXpu();
config2.SetExecStream(static_cast<void*>(context2->xpu_stream));
auto predictor2 = CreatePredictor(config2);
auto stream2 = predictor2->GetExecStream();
CHECK_NOTNULL(paddle::ResourceManager::Instance().GetXPUResource(stream2));
CHECK_EQ(paddle::ResourceManager::Instance().RefCount(stream2), 1);
CHECK_NE(stream1, stream2);
}
}
#endif
TEST(AnalysisPredictor, OutputHookFunc) {
auto hookfunc = [](const std::string& type,
const std::string& var_name,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册