未验证 提交 37ba18bb 编写于 作者: L Leo Chen 提交者: GitHub

Refine thread pool config of interpretercore (#46217)

* add config

* add config

* follow comments

* fix serial run
上级 f65a61a2
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h" #include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/threadpool_config.h"
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/os_info.h" #include "paddle/fluid/platform/os_info.h"
...@@ -47,9 +48,6 @@ constexpr const char* kTaskCompletion = "TaskCompletion"; ...@@ -47,9 +48,6 @@ constexpr const char* kTaskCompletion = "TaskCompletion";
namespace paddle { namespace paddle {
namespace framework { namespace framework {
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
static constexpr size_t kDeviceNumThreads = 1;
InterpreterCore::InterpreterCore(const platform::Place& place, InterpreterCore::InterpreterCore(const platform::Place& place,
const BlockDesc& block, const BlockDesc& block,
...@@ -308,8 +306,14 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) { ...@@ -308,8 +306,14 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() { std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() {
if (async_work_queue_ == nullptr) { if (async_work_queue_ == nullptr) {
async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>( int host_num_threads = 1, deivce_num_threads = 1, prepare_num_threads = 1;
kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_); std::tie(host_num_threads, deivce_num_threads, prepare_num_threads) =
interpreter::GetThreadPoolConfig(place_, vec_instruction_.size());
async_work_queue_ =
std::make_shared<interpreter::AsyncWorkQueue>(host_num_threads,
deivce_num_threads,
prepare_num_threads,
&main_thread_blocker_);
} }
return async_work_queue_; return async_work_queue_;
} }
...@@ -788,14 +792,23 @@ void InterpreterCore::ExecuteInstructionList( ...@@ -788,14 +792,23 @@ void InterpreterCore::ExecuteInstructionList(
platform::RecordEvent record_prepare( platform::RecordEvent record_prepare(
"PrepareAtomic", platform::TracerEventType::UserDefined, 1); "PrepareAtomic", platform::TracerEventType::UserDefined, 1);
std::unique_ptr<std::vector<std::atomic<size_t>>> atomic_deps = nullptr;
std::unique_ptr<std::vector<std::atomic<size_t>>> atomic_var_ref = nullptr;
if (async_work_queue_->QueueNumThreads(kPrepareWorkQueueIdx)) {
// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step // those for the next step
auto atomic_deps = atomic_deps_.get(); atomic_deps = atomic_deps_.get();
auto atomic_var_ref = atomic_var_ref_.get(); atomic_var_ref = atomic_var_ref_.get();
atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_); atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_);
atomic_var_ref_ = atomic_var_ref_ =
async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo()); async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo());
} else {
atomic_deps = interpreter::PrepareAtomicDeps(dependecy_count_);
atomic_var_ref = interpreter::PrepareAtomicVarRef(var_scope_.VecMetaInfo());
}
record_prepare.End(); record_prepare.End();
exception_holder_.Clear(); exception_holder_.Clear();
......
...@@ -129,7 +129,7 @@ class InterpreterCore { ...@@ -129,7 +129,7 @@ class InterpreterCore {
std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode
// last_live_ops_[i] contains the id of operatos that last access var[i] // last_live_ops_[i] contains the id of operators that last access var[i]
std::map<size_t, std::set<size_t>> last_live_ops_; std::map<size_t, std::set<size_t>> last_live_ops_;
std::vector<size_t> dependecy_count_; std::vector<size_t> dependecy_count_;
......
...@@ -42,10 +42,12 @@ namespace framework { ...@@ -42,10 +42,12 @@ namespace framework {
namespace interpreter { namespace interpreter {
using VariableIdMap = std::map<std::string, std::vector<int>>; using VariableIdMap = std::map<std::string, std::vector<int>>;
constexpr size_t kPrepareWorkQueueIdx = 2;
const std::vector<WorkQueueOptions> ConstructWorkQueueOptions( const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
size_t host_num_threads, size_t device_num_threads, EventsWaiter* waiter) { size_t host_num_threads,
size_t device_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter) {
std::vector<WorkQueueOptions> group_options; std::vector<WorkQueueOptions> group_options;
// for execute host Kernel // for execute host Kernel
group_options.emplace_back(/*name*/ "HostTasks", group_options.emplace_back(/*name*/ "HostTasks",
...@@ -65,7 +67,7 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions( ...@@ -65,7 +67,7 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
/*events_waiter*/ waiter); /*events_waiter*/ waiter);
// for prepare deps and others // for prepare deps and others
group_options.emplace_back(/*name*/ "Prepare", group_options.emplace_back(/*name*/ "Prepare",
/*num_threads*/ 1, /*num_threads*/ prepare_num_threads,
/*allow_spinning*/ true, /*allow_spinning*/ true,
/*always_spinning*/ false, /*always_spinning*/ false,
/*track_task*/ false, /*track_task*/ false,
...@@ -76,10 +78,11 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions( ...@@ -76,10 +78,11 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads, AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads,
size_t device_num_threads, size_t device_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter) EventsWaiter* waiter)
: host_num_thread_(host_num_threads) { : host_num_thread_(host_num_threads) {
queue_group_ = CreateWorkQueueGroup( queue_group_ = CreateWorkQueueGroup(ConstructWorkQueueOptions(
ConstructWorkQueueOptions(host_num_threads, device_num_threads, waiter)); host_num_threads, device_num_threads, prepare_num_threads, waiter));
} }
void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type,
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include "paddle/fluid/platform/init.h" #include "paddle/fluid/platform/init.h"
using AtomicVectorSizeT = std::vector<std::atomic<size_t>>; using AtomicVectorSizeT = std::vector<std::atomic<size_t>>;
constexpr size_t kPrepareWorkQueueIdx = 2;
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -48,6 +49,7 @@ class AsyncWorkQueue { ...@@ -48,6 +49,7 @@ class AsyncWorkQueue {
public: public:
AsyncWorkQueue(size_t host_num_threads, AsyncWorkQueue(size_t host_num_threads,
size_t deivce_num_threads, size_t deivce_num_threads,
size_t prepare_num_threads,
EventsWaiter* waiter); EventsWaiter* waiter);
std::future<std::unique_ptr<AtomicVectorSizeT>> PrepareAtomicDeps( std::future<std::unique_ptr<AtomicVectorSizeT>> PrepareAtomicDeps(
...@@ -61,6 +63,10 @@ class AsyncWorkQueue { ...@@ -61,6 +63,10 @@ class AsyncWorkQueue {
void Cancel() { queue_group_->Cancel(); } void Cancel() { queue_group_->Cancel(); }
size_t QueueNumThreads(size_t idx) {
return queue_group_->QueueNumThreads(idx);
}
private: private:
size_t host_num_thread_; size_t host_num_thread_;
std::unique_ptr<WorkQueueGroup> queue_group_; std::unique_ptr<WorkQueueGroup> queue_group_;
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <thread>
#include "paddle/fluid/platform/device/ipu/ipu_info.h"
#include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
DECLARE_bool(new_executor_serial_run);
namespace paddle {
namespace framework {
namespace interpreter {
static constexpr size_t kHostNumThreads = 4;
static constexpr size_t kDeviceNumThreads = 1;
static constexpr size_t kNumGcThreads = 1;
static constexpr size_t kNumPrepareThreads = 0;
static constexpr size_t kMinOpNumForAsyncPrepare = 1000;
// By default, one interpretercore contains:
// 1-size thread pool for device kernel launch (or 0 for cpu execution),
// 1-size thread pool for host kernel launch (or more if the system contains
// enough processors).
// And it may contain:
// 1-size thread pool for gc if it is can not use FastGC,
// 1-size thread pool for preparation if the program contains two many ops
// (1000+).
// Note that the purpose of the config is to limit the total 'possible'
// threads introduced by interpretercore to avoid hurting performance.
inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place place,
size_t op_num) {
int num_device_threads = kDeviceNumThreads,
num_host_threads = kHostNumThreads,
num_prepare_threads = kNumPrepareThreads;
if (op_num > kMinOpNumForAsyncPrepare) {
num_prepare_threads = 1;
}
int device_count = 0, processor_count = 0;
if (platform::is_cpu_place(place)) {
num_device_threads = 0;
num_host_threads = 4;
} else {
processor_count = std::thread::hardware_concurrency();
if (processor_count) {
if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
device_count = phi::backends::gpu::GetGPUDeviceCount();
#endif
}
if (platform::is_xpu_place(place)) {
#if defined(PADDLE_WITH_XPU)
device_count = phi::backends::xpu::GetXPUDeviceCount();
#endif
}
if (platform::is_npu_place(place)) {
#if defined(PADDLE_WITH_ASCEND_CL)
device_count = platform::GetNPUDeviceCount();
#endif
}
if (platform::is_ipu_place(place)) {
#if defined(PADDLE_WITH_IPU)
device_count = platform::GetIPUDeviceCount();
#endif
}
if (platform::is_custom_place(place)) {
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
device_count =
phi::DeviceManager::GetDeviceCount(place.GetDeviceType());
#endif
}
// Tricky implementation.
// In multi-card training, each card may set env like
// CUDA_VISIBLE_DEVICE=0 In that case, device_count is set to 8.
if (device_count == 1) {
device_count = 8; // in many case, the accelerator has 8 cards.
}
// We expect processor_count = 2 * (the possible total threads when doing
// multi-card training), to make sure that the system will not slow down
// because of too many threads. Here, 2 is experience value. Since each
// device has one interpretercore, the possible total threads when doing
// multi-card training = device_count * (the possible total threads in one
// interpretercore).
if (device_count) {
auto num = processor_count / device_count / 2 -
(kNumGcThreads + kNumPrepareThreads + num_device_threads);
num_host_threads =
num > 0 ? (num > kHostNumThreads ? kHostNumThreads : num) : 1;
}
}
}
// In serial run, only one 1-size thread pool is used
if (FLAGS_new_executor_serial_run) {
num_host_threads = 0;
num_device_threads = 1;
}
VLOG(4) << "place:" << place << ", processor_count:" << processor_count
<< ", device_count:" << device_count
<< ", serial_run:" << FLAGS_new_executor_serial_run
<< ", num_host_threads:" << num_host_threads
<< ", num_device_threads:" << num_device_threads
<< ", num_prepare_threads:" << num_prepare_threads;
return std::make_tuple(
num_host_threads, num_device_threads, num_prepare_threads);
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
...@@ -121,8 +121,13 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( ...@@ -121,8 +121,13 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
queues_.resize(num_queues); queues_.resize(num_queues);
void* buffer = malloc(sizeof(NonblockingThreadPool) * num_queues); void* buffer = malloc(sizeof(NonblockingThreadPool) * num_queues);
queues_storage_ = reinterpret_cast<NonblockingThreadPool*>(buffer); queues_storage_ = reinterpret_cast<NonblockingThreadPool*>(buffer);
for (size_t idx = 0; idx < num_queues; ++idx) { for (size_t idx = 0; idx < num_queues; ++idx) {
const auto& options = queues_options_[idx]; const auto& options = queues_options_[idx];
if (options.num_threads == 0) {
queues_[idx] = nullptr;
continue;
}
if (options.track_task && tracker_ == nullptr && if (options.track_task && tracker_ == nullptr &&
options.events_waiter != nullptr) { options.events_waiter != nullptr) {
empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent); empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent);
...@@ -144,8 +149,10 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( ...@@ -144,8 +149,10 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
WorkQueueGroupImpl::~WorkQueueGroupImpl() { WorkQueueGroupImpl::~WorkQueueGroupImpl() {
for (auto queue : queues_) { for (auto queue : queues_) {
if (queue) {
queue->~NonblockingThreadPool(); queue->~NonblockingThreadPool();
} }
}
if (tracker_ != nullptr) { if (tracker_ != nullptr) {
tracker_->~TaskTracker(); tracker_->~TaskTracker();
AlignedFree(tracker_); AlignedFree(tracker_);
...@@ -161,6 +168,10 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) { ...@@ -161,6 +168,10 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
platform::TracerEventType::UserDefined, platform::TracerEventType::UserDefined,
10 /*level*/); 10 /*level*/);
assert(queue_idx < queues_.size()); assert(queue_idx < queues_.size());
PADDLE_ENFORCE_NOT_NULL(
queues_.at(queue_idx),
platform::errors::NotFound("Workqueue of index %d is not initialized.",
queue_idx));
if (queues_options_.at(queue_idx).track_task) { if (queues_options_.at(queue_idx).track_task) {
fn = [task = std::move(fn), fn = [task = std::move(fn),
raii = CounterGuard<TaskTracker>(tracker_)]() mutable { task(); }; raii = CounterGuard<TaskTracker>(tracker_)]() mutable { task(); };
...@@ -170,6 +181,9 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) { ...@@ -170,6 +181,9 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const { size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const {
assert(queue_idx < queues_.size()); assert(queue_idx < queues_.size());
if (!queues_.at(queue_idx)) {
return 0;
}
return queues_.at(queue_idx)->NumThreads(); return queues_.at(queue_idx)->NumThreads();
} }
...@@ -183,11 +197,15 @@ size_t WorkQueueGroupImpl::QueueGroupNumThreads() const { ...@@ -183,11 +197,15 @@ size_t WorkQueueGroupImpl::QueueGroupNumThreads() const {
void WorkQueueGroupImpl::Cancel() { void WorkQueueGroupImpl::Cancel() {
for (auto queue : queues_) { for (auto queue : queues_) {
if (queue) {
queue->Cancel(); queue->Cancel();
} }
}
for (auto queue : queues_) { for (auto queue : queues_) {
if (queue) {
queue->WaitThreadsExit(); queue->WaitThreadsExit();
} }
}
} }
} // namespace } // namespace
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册