Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
4c3361cd
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4c3361cd
编写于
3月 24, 2018
作者:
Y
Yu Yang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Extract GraphExecutor
上级
b123e43b
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
194 addition
and
129 deletion
+194
-129
paddle/fluid/framework/parallel_executor.cc
paddle/fluid/framework/parallel_executor.cc
+194
-129
未找到文件。
paddle/fluid/framework/parallel_executor.cc
浏览文件 @
4c3361cd
...
...
@@ -24,42 +24,184 @@ limitations under the License. */
namespace
paddle
{
namespace
framework
{
class
ParallelExecutorPrivate
{
using
details
::
DummyVarHandle
;
using
details
::
FetchOpHandle
;
using
details
::
OpHandleBase
;
using
details
::
SSAGraph
;
using
details
::
VarHandleBase
;
class
SSAGraphExecutor
{
DISABLE_COPY_AND_ASSIGN
(
SSAGraphExecutor
);
public:
explicit
ParallelExecutorPrivate
(
size_t
num_threads
,
const
std
::
vector
<
platform
::
Place
>
&
places
)
:
places_
(
places
),
fetch_dev_ctxs_
(
places
),
pool_
(
num_threads
<=
1
?
nullptr
:
new
ThreadPool
(
num_threads
))
{}
explicit
SSAGraphExecutor
(
SSAGraph
*
graph
)
:
graph_
(
*
graph
)
{}
std
::
vector
<
platform
::
Place
>
places_
;
platform
::
DeviceContextPool
fetch_dev_ctxs_
;
std
::
vector
<
Scope
*>
local_scopes_
;
Scope
*
global_scope_
;
virtual
~
SSAGraphExecutor
()
{}
std
::
unique_ptr
<
platform
::
NCCLContextMap
>
nccl_ctxs_
;
virtual
void
Run
(
Scope
*
global_scope
,
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
,
const
std
::
string
&
fetch_list_name
)
=
0
;
details
::
SSAGraph
graph_
;
protected:
SSAGraph
&
graph_
;
};
// Use a simpler thread pool, might be faster.
std
::
unique_ptr
<
ThreadPool
>
pool_
;
class
ThreadedSSAGraphExecutor
:
public
SSAGraphExecutor
{
public:
ThreadedSSAGraphExecutor
(
size_t
num_threads
,
bool
use_event
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
SSAGraph
*
graph
)
:
SSAGraphExecutor
(
graph
),
pool_
(
num_threads
>=
2
?
new
::
ThreadPool
(
num_threads
)
:
nullptr
),
local_scopes_
(
local_scopes
),
places_
(
places
),
fetch_ctxs_
(
places
),
use_event_
(
use_event
)
{}
void
Run
(
Scope
*
global_scope
,
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
,
const
std
::
string
&
fetch_list_name
)
override
{
std
::
unordered_map
<
OpHandleBase
*
,
size_t
>
pending_ops
;
std
::
unordered_map
<
VarHandleBase
*
,
std
::
atomic
<
bool
>>
pending_vars
;
std
::
unordered_set
<
OpHandleBase
*>
ready_ops
;
auto
InsertPendingVar
=
[
&
pending_vars
](
VarHandleBase
&
var
)
{
pending_vars
[
&
var
]
=
var
.
generated_op_
==
nullptr
;
};
std
::
unique_ptr
<
platform
::
EnforceNotMet
>
exception_
;
auto
InsertPendingOp
=
[
&
pending_ops
](
OpHandleBase
&
op_instance
)
{
pending_ops
.
insert
({
&
op_instance
,
op_instance
.
inputs_
.
size
()});
};
// Transform SSAGraph to pending_ops & pending_vars
for
(
auto
&
var_map
:
graph_
.
vars_
)
{
for
(
auto
&
name_pair
:
var_map
)
{
for
(
auto
&
version_pair
:
name_pair
.
second
)
{
InsertPendingVar
(
version_pair
.
second
);
}
}
}
for
(
auto
&
var
:
graph_
.
dep_vars_
)
{
InsertPendingVar
(
*
var
);
}
for
(
auto
&
op
:
graph_
.
ops_
)
{
if
(
op
->
inputs_
.
empty
())
{
// Special case, Op has no input.
ready_ops
.
insert
(
op
.
get
());
}
else
{
InsertPendingOp
(
*
op
);
}
}
// Step 2. Insert FetchOps
std
::
vector
<
FetchOpHandle
>
fetch_ops
;
std
::
vector
<
DummyVarHandle
>
dummy_vars
;
FeedFetchList
fetch_data
(
fetch_tensors
.
size
());
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
VarHandleBase
*>>
fetched_vars
;
for
(
auto
&
fetch_var_name
:
fetch_tensors
)
{
for
(
auto
&
var_map
:
graph_
.
vars_
)
{
auto
it
=
var_map
.
find
(
fetch_var_name
);
if
(
it
!=
var_map
.
end
())
{
fetched_vars
[
fetch_var_name
].
push_back
(
&
it
->
second
.
rbegin
()
->
second
);
}
}
}
void
RunOp
(
bool
use_event
,
std
::
unordered_map
<
details
::
VarHandleBase
*
,
std
::
atomic
<
bool
>>
&
pending_vars
,
details
::
OpHandleBase
*
op
)
{
for
(
size_t
i
=
0
;
i
<
fetch_tensors
.
size
();
++
i
)
{
auto
&
var_name
=
fetch_tensors
[
i
];
auto
&
vars
=
fetched_vars
[
var_name
];
fetch_ops
.
emplace_back
(
&
fetch_data
,
i
,
&
local_scopes_
);
details
::
FetchOpHandle
*
op
=
&
fetch_ops
.
back
();
// FIXME: Use new device context
for
(
auto
&
p
:
places_
)
{
op
->
dev_ctx_
[
p
]
=
fetch_ctxs_
.
Get
(
p
);
}
for
(
auto
*
var
:
vars
)
{
op
->
AddInput
(
var
);
}
dummy_vars
.
emplace_back
();
auto
*
var
=
&
dummy_vars
.
back
();
var
->
generated_op_
=
nullptr
;
op
->
AddOutput
(
var
);
InsertPendingVar
(
*
var
);
InsertPendingOp
(
*
op
);
}
auto
run_all_ready_ops
=
[
&
]
{
for
(
auto
*
op
:
ready_ops
)
{
RunOp
(
pending_vars
,
op
);
}
ready_ops
.
clear
();
};
// Step 3. Execution
while
(
!
pending_vars
.
empty
())
{
// 1. Run All Ready ops
run_all_ready_ops
();
// 2. Find ready variable
VarHandleBase
*
ready_var
=
nullptr
;
for
(
auto
&
pair
:
pending_vars
)
{
if
(
pair
.
second
.
load
(
std
::
memory_order_acquire
))
{
ready_var
=
pair
.
first
;
break
;
}
}
// if there is no variable ready
if
(
ready_var
==
nullptr
)
{
// FIXME use conditional var instead of busy wait.
// if there is an exception, throw it
if
(
exception_
)
{
throw
*
exception_
;
}
// keep waiting the ready variables
continue
;
}
// 3. Remove the dependency of ready_var.
// Find the ready_ops after the ready_var.
pending_vars
.
erase
(
ready_var
);
for
(
auto
*
op
:
ready_var
->
pending_ops_
)
{
auto
&
deps
=
pending_ops
[
op
];
--
deps
;
if
(
deps
==
0
)
{
ready_ops
.
insert
(
op
);
}
}
// Keep loop until all vars are ready.
}
// Wait FetchOps.
for
(
auto
&
fetch_op
:
fetch_ops
)
{
fetch_op
.
WaitAndMergeCPUTensors
();
}
*
global_scope
->
Var
(
fetch_list_name
)
->
GetMutable
<
FeedFetchList
>
()
=
fetch_data
;
}
~
ThreadedSSAGraphExecutor
()
{}
private:
void
RunOp
(
std
::
unordered_map
<
VarHandleBase
*
,
std
::
atomic
<
bool
>>
&
pending_vars
,
details
::
OpHandleBase
*
op
)
{
std
::
vector
<
std
::
atomic
<
bool
>
*>
*
ready_buffer
=
new
std
::
vector
<
std
::
atomic
<
bool
>
*>
();
for
(
auto
*
var
:
op
->
outputs_
)
{
ready_buffer
->
emplace_back
(
&
pending_vars
[
var
]);
}
auto
op_run
=
[
ready_buffer
,
op
,
this
,
use_event
]
{
auto
op_run
=
[
ready_buffer
,
op
,
this
]
{
try
{
VLOG
(
10
)
<<
op
->
DebugString
();
op
->
Run
(
use_event
);
op
->
Run
(
use_event
_
);
for
(
auto
*
ready
:
*
ready_buffer
)
{
ready
->
store
(
true
,
std
::
memory_order_release
);
}
...
...
@@ -76,6 +218,31 @@ class ParallelExecutorPrivate {
op_run
();
}
}
private:
std
::
unique_ptr
<::
ThreadPool
>
pool_
;
std
::
vector
<
Scope
*>
local_scopes_
;
std
::
vector
<
platform
::
Place
>
places_
;
platform
::
DeviceContextPool
fetch_ctxs_
;
const
bool
use_event_
;
std
::
unique_ptr
<
platform
::
EnforceNotMet
>
exception_
;
};
class
ParallelExecutorPrivate
{
public:
explicit
ParallelExecutorPrivate
(
const
std
::
vector
<
platform
::
Place
>
&
places
)
:
places_
(
places
),
fetch_dev_ctxs_
(
places
)
{}
std
::
vector
<
platform
::
Place
>
places_
;
platform
::
DeviceContextPool
fetch_dev_ctxs_
;
std
::
vector
<
Scope
*>
local_scopes_
;
Scope
*
global_scope_
;
std
::
unique_ptr
<
platform
::
NCCLContextMap
>
nccl_ctxs_
;
details
::
SSAGraph
graph_
;
std
::
unique_ptr
<
SSAGraphExecutor
>
executor_
;
};
ParallelExecutor
::
ParallelExecutor
(
...
...
@@ -83,7 +250,7 @@ ParallelExecutor::ParallelExecutor(
const
std
::
unordered_set
<
std
::
string
>
&
params
,
const
ProgramDesc
&
startup_program
,
const
ProgramDesc
&
main_program
,
const
std
::
string
&
loss_var_name
,
Scope
*
scope
)
:
member_
(
new
ParallelExecutorPrivate
(
num_threads
,
places
))
{
:
member_
(
new
ParallelExecutorPrivate
(
places
))
{
member_
->
global_scope_
=
scope
;
// Step 1. RunStartupProgram and Bcast the params to devs.
...
...
@@ -109,6 +276,9 @@ ParallelExecutor::ParallelExecutor(
member_
->
nccl_ctxs_
.
get
());
builder
.
Build
(
main_program
,
&
member_
->
graph_
);
member_
->
executor_
.
reset
(
new
ThreadedSSAGraphExecutor
(
num_threads
,
true
,
member_
->
local_scopes_
,
places
,
&
member_
->
graph_
));
// Step 3. Create vars in each scope;
for
(
auto
*
scope
:
member_
->
local_scopes_
)
{
for
(
auto
*
var
:
main_program
.
Block
(
0
).
AllVars
())
{
...
...
@@ -168,113 +338,8 @@ void ParallelExecutor::BuildNCCLCommunicator() const {
void
ParallelExecutor
::
Run
(
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
,
const
std
::
string
&
fetched_var_name
)
{
bool
use_event
=
true
;
FeedFetchList
fetched_data
(
fetch_tensors
.
size
());
// Version --> VarHandle
member_
->
exception_
.
reset
();
std
::
unordered_map
<
details
::
VarHandleBase
*
,
std
::
atomic
<
bool
>>
pending_vars
;
std
::
unordered_map
<
details
::
OpHandleBase
*
,
size_t
>
pending_ops
;
std
::
vector
<
details
::
DummyVarHandle
>
dummy_vars
;
for
(
auto
&
var_map
:
member_
->
graph_
.
vars_
)
{
for
(
auto
&
name_pair
:
var_map
)
{
for
(
auto
&
version_pair
:
name_pair
.
second
)
{
pending_vars
[
&
version_pair
.
second
]
=
version_pair
.
second
.
generated_op_
==
nullptr
;
}
}
}
for
(
auto
&
var
:
member_
->
graph_
.
dep_vars_
)
{
pending_vars
[
var
.
get
()]
=
var
->
generated_op_
==
nullptr
;
}
std
::
vector
<
details
::
OpHandleBase
*>
to_run
;
for
(
auto
&
op
:
member_
->
graph_
.
ops_
)
{
if
(
op
->
inputs_
.
empty
())
{
// Special case, Op has no input.
to_run
.
emplace_back
(
op
.
get
());
}
else
{
pending_ops
.
insert
({
op
.
get
(),
op
->
inputs_
.
size
()});
}
}
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
details
::
VarHandleBase
*>>
fetched_vars
;
for
(
auto
&
fetch_var_name
:
fetch_tensors
)
{
for
(
auto
&
var_map
:
member_
->
graph_
.
vars_
)
{
auto
it
=
var_map
.
find
(
fetch_var_name
);
if
(
it
!=
var_map
.
end
())
{
fetched_vars
[
fetch_var_name
].
push_back
(
&
it
->
second
.
rbegin
()
->
second
);
}
}
}
std
::
vector
<
details
::
FetchOpHandle
>
fetch_ops
;
for
(
size_t
i
=
0
;
i
<
fetch_tensors
.
size
();
++
i
)
{
auto
&
var_name
=
fetch_tensors
[
i
];
auto
&
vars
=
fetched_vars
[
var_name
];
fetch_ops
.
emplace_back
(
&
fetched_data
,
i
,
&
member_
->
local_scopes_
);
details
::
FetchOpHandle
*
op
=
&
fetch_ops
.
back
();
// FIXME: Use new device context
for
(
auto
&
p
:
member_
->
places_
)
{
op
->
dev_ctx_
[
p
]
=
member_
->
fetch_dev_ctxs_
.
Get
(
p
);
}
for
(
auto
*
var
:
vars
)
{
op
->
AddInput
(
var
);
}
dummy_vars
.
emplace_back
();
auto
*
var
=
&
dummy_vars
.
back
();
op
->
AddOutput
(
var
);
pending_vars
[
var
]
=
false
;
pending_ops
.
insert
({
op
,
op
->
inputs_
.
size
()});
}
for
(
auto
*
op
:
to_run
)
{
member_
->
RunOp
(
use_event
,
pending_vars
,
op
);
}
while
(
!
pending_vars
.
empty
())
{
details
::
VarHandleBase
*
ready_var
=
nullptr
;
for
(
auto
&
pair
:
pending_vars
)
{
if
(
pair
.
second
.
load
(
std
::
memory_order_acquire
))
{
ready_var
=
pair
.
first
;
}
}
if
(
ready_var
==
nullptr
)
{
// FIXME use conditional var instead of busy wait.
if
(
member_
->
exception_
)
{
throw
*
member_
->
exception_
;
}
continue
;
}
pending_vars
.
erase
(
ready_var
);
to_run
.
clear
();
for
(
auto
*
op
:
ready_var
->
pending_ops_
)
{
auto
&
deps
=
pending_ops
[
op
];
--
deps
;
if
(
deps
==
0
)
{
to_run
.
emplace_back
(
op
);
}
}
for
(
auto
*
op
:
to_run
)
{
pending_ops
.
erase
(
op
);
member_
->
RunOp
(
use_event
,
pending_vars
,
op
);
}
}
for
(
auto
&
fetch_op
:
fetch_ops
)
{
fetch_op
.
WaitAndMergeCPUTensors
();
}
*
member_
->
global_scope_
->
Var
(
fetched_var_name
)
->
GetMutable
<
FeedFetchList
>
()
=
fetched_data
;
member_
->
executor_
->
Run
(
member_
->
global_scope_
,
fetch_tensors
,
fetched_var_name
);
}
}
// namespace framework
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录