Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
1418a719
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 2 年 前同步成功
通知
2325
Star
20933
Fork
5424
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1418a719
编写于
9月 20, 2022
作者:
L
Leo Chen
提交者:
GitHub
9月 20, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[cherry-pick] Refine thread pool config of interpretercore (#46219)
* add config * add config * follow comments * fix serial run
上级
50340302
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
198 addition
and
22 deletion
+198
-22
paddle/fluid/framework/new_executor/interpretercore.cc
paddle/fluid/framework/new_executor/interpretercore.cc
+26
-13
paddle/fluid/framework/new_executor/interpretercore.h
paddle/fluid/framework/new_executor/interpretercore.h
+1
-1
paddle/fluid/framework/new_executor/interpretercore_util.cc
paddle/fluid/framework/new_executor/interpretercore_util.cc
+8
-5
paddle/fluid/framework/new_executor/interpretercore_util.h
paddle/fluid/framework/new_executor/interpretercore_util.h
+6
-0
paddle/fluid/framework/new_executor/threadpool_config.h
paddle/fluid/framework/new_executor/threadpool_config.h
+136
-0
paddle/fluid/framework/new_executor/workqueue/workqueue.cc
paddle/fluid/framework/new_executor/workqueue/workqueue.cc
+21
-3
未找到文件。
paddle/fluid/framework/new_executor/interpretercore.cc
浏览文件 @
1418a719
...
...
@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/threadpool_config.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/os_info.h"
...
...
@@ -47,9 +48,6 @@ constexpr const char* kTaskCompletion = "TaskCompletion";
namespace
paddle
{
namespace
framework
{
// NOTE(Aurelius84): Need a better strategy to determine it.
static
constexpr
size_t
kHostNumThreads
=
4
;
static
constexpr
size_t
kDeviceNumThreads
=
1
;
InterpreterCore
::
InterpreterCore
(
const
platform
::
Place
&
place
,
const
BlockDesc
&
block
,
...
...
@@ -293,8 +291,14 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
std
::
shared_ptr
<
interpreter
::
AsyncWorkQueue
>
InterpreterCore
::
GetWorkQueue
()
{
if
(
async_work_queue_
==
nullptr
)
{
async_work_queue_
=
std
::
make_shared
<
interpreter
::
AsyncWorkQueue
>
(
kHostNumThreads
,
kDeviceNumThreads
,
&
main_thread_blocker_
);
int
host_num_threads
=
1
,
deivce_num_threads
=
1
,
prepare_num_threads
=
1
;
std
::
tie
(
host_num_threads
,
deivce_num_threads
,
prepare_num_threads
)
=
interpreter
::
GetThreadPoolConfig
(
place_
,
vec_instruction_
.
size
());
async_work_queue_
=
std
::
make_shared
<
interpreter
::
AsyncWorkQueue
>
(
host_num_threads
,
deivce_num_threads
,
prepare_num_threads
,
&
main_thread_blocker_
);
}
return
async_work_queue_
;
}
...
...
@@ -773,14 +777,23 @@ void InterpreterCore::ExecuteInstructionList(
platform
::
RecordEvent
record_prepare
(
"PrepareAtomic"
,
platform
::
TracerEventType
::
UserDefined
,
1
);
std
::
unique_ptr
<
std
::
vector
<
std
::
atomic
<
size_t
>>>
atomic_deps
=
nullptr
;
std
::
unique_ptr
<
std
::
vector
<
std
::
atomic
<
size_t
>>>
atomic_var_ref
=
nullptr
;
if
(
async_work_queue_
->
QueueNumThreads
(
kPrepareWorkQueueIdx
))
{
// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step
auto
atomic_deps
=
atomic_deps_
.
get
();
auto
atomic_var_ref
=
atomic_var_ref_
.
get
();
atomic_deps
=
atomic_deps_
.
get
();
atomic_var_ref
=
atomic_var_ref_
.
get
();
atomic_deps_
=
async_work_queue_
->
PrepareAtomicDeps
(
dependecy_count_
);
atomic_var_ref_
=
async_work_queue_
->
PrepareAtomicVarRef
(
var_scope_
.
VecMetaInfo
());
}
else
{
atomic_deps
=
interpreter
::
PrepareAtomicDeps
(
dependecy_count_
);
atomic_var_ref
=
interpreter
::
PrepareAtomicVarRef
(
var_scope_
.
VecMetaInfo
());
}
record_prepare
.
End
();
exception_holder_
.
Clear
();
...
...
paddle/fluid/framework/new_executor/interpretercore.h
浏览文件 @
1418a719
...
...
@@ -129,7 +129,7 @@ class InterpreterCore {
std
::
vector
<
Instruction
>
vec_instruction_
;
// deconstruct before OpFuncNode
// last_live_ops_[i] contains the id of operatos that last access var[i]
// last_live_ops_[i] contains the id of operato
r
s that last access var[i]
std
::
map
<
size_t
,
std
::
set
<
size_t
>>
last_live_ops_
;
std
::
vector
<
size_t
>
dependecy_count_
;
...
...
paddle/fluid/framework/new_executor/interpretercore_util.cc
浏览文件 @
1418a719
...
...
@@ -42,10 +42,12 @@ namespace framework {
namespace
interpreter
{
using
VariableIdMap
=
std
::
map
<
std
::
string
,
std
::
vector
<
int
>>
;
constexpr
size_t
kPrepareWorkQueueIdx
=
2
;
const
std
::
vector
<
WorkQueueOptions
>
ConstructWorkQueueOptions
(
size_t
host_num_threads
,
size_t
device_num_threads
,
EventsWaiter
*
waiter
)
{
size_t
host_num_threads
,
size_t
device_num_threads
,
size_t
prepare_num_threads
,
EventsWaiter
*
waiter
)
{
std
::
vector
<
WorkQueueOptions
>
group_options
;
// for execute host Kernel
group_options
.
emplace_back
(
/*name*/
"HostTasks"
,
...
...
@@ -65,7 +67,7 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
/*events_waiter*/
waiter
);
// for prepare deps and others
group_options
.
emplace_back
(
/*name*/
"Prepare"
,
/*num_threads*/
1
,
/*num_threads*/
prepare_num_threads
,
/*allow_spinning*/
true
,
/*always_spinning*/
false
,
/*track_task*/
false
,
...
...
@@ -76,10 +78,11 @@ const std::vector<WorkQueueOptions> ConstructWorkQueueOptions(
AsyncWorkQueue
::
AsyncWorkQueue
(
size_t
host_num_threads
,
size_t
device_num_threads
,
size_t
prepare_num_threads
,
EventsWaiter
*
waiter
)
:
host_num_thread_
(
host_num_threads
)
{
queue_group_
=
CreateWorkQueueGroup
(
ConstructWorkQueueOptions
(
host_num_threads
,
devic
e_num_threads
,
waiter
));
queue_group_
=
CreateWorkQueueGroup
(
ConstructWorkQueueOptions
(
host_num_threads
,
device_num_threads
,
prepar
e_num_threads
,
waiter
));
}
void
AsyncWorkQueue
::
AddTask
(
const
OpFuncType
&
op_func_type
,
...
...
paddle/fluid/framework/new_executor/interpretercore_util.h
浏览文件 @
1418a719
...
...
@@ -39,6 +39,7 @@
#include "paddle/fluid/platform/init.h"
using
AtomicVectorSizeT
=
std
::
vector
<
std
::
atomic
<
size_t
>>
;
constexpr
size_t
kPrepareWorkQueueIdx
=
2
;
namespace
paddle
{
namespace
framework
{
...
...
@@ -48,6 +49,7 @@ class AsyncWorkQueue {
public:
AsyncWorkQueue
(
size_t
host_num_threads
,
size_t
deivce_num_threads
,
size_t
prepare_num_threads
,
EventsWaiter
*
waiter
);
std
::
future
<
std
::
unique_ptr
<
AtomicVectorSizeT
>>
PrepareAtomicDeps
(
...
...
@@ -61,6 +63,10 @@ class AsyncWorkQueue {
void
Cancel
()
{
queue_group_
->
Cancel
();
}
size_t
QueueNumThreads
(
size_t
idx
)
{
return
queue_group_
->
QueueNumThreads
(
idx
);
}
private:
size_t
host_num_thread_
;
std
::
unique_ptr
<
WorkQueueGroup
>
queue_group_
;
...
...
paddle/fluid/framework/new_executor/threadpool_config.h
0 → 100644
浏览文件 @
1418a719
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <thread>
#include "paddle/fluid/platform/device/ipu/ipu_info.h"
#include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/backends/xpu/xpu_info.h"
DECLARE_bool
(
new_executor_serial_run
);
namespace
paddle
{
namespace
framework
{
namespace
interpreter
{
static
constexpr
size_t
kHostNumThreads
=
4
;
static
constexpr
size_t
kDeviceNumThreads
=
1
;
static
constexpr
size_t
kNumGcThreads
=
1
;
static
constexpr
size_t
kNumPrepareThreads
=
0
;
static
constexpr
size_t
kMinOpNumForAsyncPrepare
=
1000
;
// By default, one interpretercore contains:
// 1-size thread pool for device kernel launch (or 0 for cpu execution),
// 1-size thread pool for host kernel launch (or more if the system contains
// enough processors).
// And it may contain:
// 1-size thread pool for gc if it is can not use FastGC,
// 1-size thread pool for preparation if the program contains two many ops
// (1000+).
// Note that the purpose of the config is to limit the total 'possible'
// threads introduced by interpretercore to avoid hurting performance.
inline
std
::
tuple
<
int
,
int
,
int
>
GetThreadPoolConfig
(
const
phi
::
Place
place
,
size_t
op_num
)
{
int
num_device_threads
=
kDeviceNumThreads
,
num_host_threads
=
kHostNumThreads
,
num_prepare_threads
=
kNumPrepareThreads
;
if
(
op_num
>
kMinOpNumForAsyncPrepare
)
{
num_prepare_threads
=
1
;
}
int
device_count
=
0
,
processor_count
=
0
;
if
(
platform
::
is_cpu_place
(
place
))
{
num_device_threads
=
0
;
num_host_threads
=
4
;
}
else
{
processor_count
=
std
::
thread
::
hardware_concurrency
();
if
(
processor_count
)
{
if
(
platform
::
is_gpu_place
(
place
))
{
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
device_count
=
phi
::
backends
::
gpu
::
GetGPUDeviceCount
();
#endif
}
if
(
platform
::
is_xpu_place
(
place
))
{
#if defined(PADDLE_WITH_XPU)
device_count
=
phi
::
backends
::
xpu
::
GetXPUDeviceCount
();
#endif
}
if
(
platform
::
is_npu_place
(
place
))
{
#if defined(PADDLE_WITH_ASCEND_CL)
device_count
=
platform
::
GetNPUDeviceCount
();
#endif
}
if
(
platform
::
is_ipu_place
(
place
))
{
#if defined(PADDLE_WITH_IPU)
device_count
=
platform
::
GetIPUDeviceCount
();
#endif
}
if
(
platform
::
is_custom_place
(
place
))
{
#if defined(PADDLE_WITH_CUSTOM_DEVICE)
device_count
=
phi
::
DeviceManager
::
GetDeviceCount
(
place
.
GetDeviceType
());
#endif
}
// Tricky implementation.
// In multi-card training, each card may set env like
// CUDA_VISIBLE_DEVICE=0 In that case, device_count is set to 8.
if
(
device_count
==
1
)
{
device_count
=
8
;
// in many case, the accelerator has 8 cards.
}
// We expect processor_count = 2 * (the possible total threads when doing
// multi-card training), to make sure that the system will not slow down
// because of too many threads. Here, 2 is experience value. Since each
// device has one interpretercore, the possible total threads when doing
// multi-card training = device_count * (the possible total threads in one
// interpretercore).
if
(
device_count
)
{
auto
num
=
processor_count
/
device_count
/
2
-
(
kNumGcThreads
+
kNumPrepareThreads
+
num_device_threads
);
num_host_threads
=
num
>
0
?
(
num
>
kHostNumThreads
?
kHostNumThreads
:
num
)
:
1
;
}
}
}
// In serial run, only one 1-size thread pool is used
if
(
FLAGS_new_executor_serial_run
)
{
num_host_threads
=
0
;
num_device_threads
=
1
;
}
VLOG
(
4
)
<<
"place:"
<<
place
<<
", processor_count:"
<<
processor_count
<<
", device_count:"
<<
device_count
<<
", serial_run:"
<<
FLAGS_new_executor_serial_run
<<
", num_host_threads:"
<<
num_host_threads
<<
", num_device_threads:"
<<
num_device_threads
<<
", num_prepare_threads:"
<<
num_prepare_threads
;
return
std
::
make_tuple
(
num_host_threads
,
num_device_threads
,
num_prepare_threads
);
}
}
// namespace interpreter
}
// namespace framework
}
// namespace paddle
paddle/fluid/framework/new_executor/workqueue/workqueue.cc
浏览文件 @
1418a719
...
...
@@ -121,8 +121,13 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
queues_
.
resize
(
num_queues
);
void
*
buffer
=
malloc
(
sizeof
(
NonblockingThreadPool
)
*
num_queues
);
queues_storage_
=
reinterpret_cast
<
NonblockingThreadPool
*>
(
buffer
);
for
(
size_t
idx
=
0
;
idx
<
num_queues
;
++
idx
)
{
const
auto
&
options
=
queues_options_
[
idx
];
if
(
options
.
num_threads
==
0
)
{
queues_
[
idx
]
=
nullptr
;
continue
;
}
if
(
options
.
track_task
&&
tracker_
==
nullptr
&&
options
.
events_waiter
!=
nullptr
)
{
empty_notifier_
=
options
.
events_waiter
->
RegisterEvent
(
kQueueEmptyEvent
);
...
...
@@ -144,8 +149,10 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
WorkQueueGroupImpl
::~
WorkQueueGroupImpl
()
{
for
(
auto
queue
:
queues_
)
{
if
(
queue
)
{
queue
->~
NonblockingThreadPool
();
}
}
if
(
tracker_
!=
nullptr
)
{
tracker_
->~
TaskTracker
();
AlignedFree
(
tracker_
);
...
...
@@ -161,6 +168,10 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
platform
::
TracerEventType
::
UserDefined
,
10
/*level*/
);
assert
(
queue_idx
<
queues_
.
size
());
PADDLE_ENFORCE_NOT_NULL
(
queues_
.
at
(
queue_idx
),
platform
::
errors
::
NotFound
(
"Workqueue of index %d is not initialized."
,
queue_idx
));
if
(
queues_options_
.
at
(
queue_idx
).
track_task
)
{
fn
=
[
task
=
std
::
move
(
fn
),
raii
=
CounterGuard
<
TaskTracker
>
(
tracker_
)]()
mutable
{
task
();
};
...
...
@@ -170,6 +181,9 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
size_t
WorkQueueGroupImpl
::
QueueNumThreads
(
size_t
queue_idx
)
const
{
assert
(
queue_idx
<
queues_
.
size
());
if
(
!
queues_
.
at
(
queue_idx
))
{
return
0
;
}
return
queues_
.
at
(
queue_idx
)
->
NumThreads
();
}
...
...
@@ -183,11 +197,15 @@ size_t WorkQueueGroupImpl::QueueGroupNumThreads() const {
void
WorkQueueGroupImpl
::
Cancel
()
{
for
(
auto
queue
:
queues_
)
{
if
(
queue
)
{
queue
->
Cancel
();
}
}
for
(
auto
queue
:
queues_
)
{
if
(
queue
)
{
queue
->
WaitThreadsExit
();
}
}
}
}
// namespace
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录