Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
b53f7e2c
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b53f7e2c
编写于
4月 17, 2018
作者:
Y
Yu Yang
提交者:
GitHub
4月 17, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9930 from reyoung/feature/simplify_delay_logic
Simplify DelayOps Logic
上级
0729ea7d
4999f85f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
18 addition
and
39 deletion
+18
-39
paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
...le/fluid/framework/details/threaded_ssa_graph_executor.cc
+15
-35
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
+0
-2
python/paddle/fluid/tests/unittests/test_parallel_executor.py
...on/paddle/fluid/tests/unittests/test_parallel_executor.py
+3
-2
未找到文件。
paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
浏览文件 @
b53f7e2c
...
...
@@ -33,13 +33,6 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
running_ops_
(
0
),
allow_op_delay_
(
allow_op_delay
)
{}
void
ThreadedSSAGraphExecutor
::
RunDelayedOps
(
const
std
::
unordered_set
<
OpHandleBase
*>
&
delayed_ops
)
{
for
(
auto
op
:
delayed_ops
)
{
op
->
Run
(
use_event_
);
}
}
FeedFetchList
ThreadedSSAGraphExecutor
::
Run
(
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
)
{
std
::
unordered_map
<
OpHandleBase
*
,
size_t
>
pending_ops
;
...
...
@@ -51,8 +44,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// together since we currently cannot overlap computation and memcpy streams.
// Should revisit it if overlapping is available.
std
::
unordered_set
<
OpHandleBase
*>
delayed_ops
;
std
::
unordered_set
<
OpHandleBase
*>
blocked_by_delayed_ops
;
std
::
unordered_set
<
VarHandleBase
*>
delayed_vars
;
auto
InsertPendingVar
=
[
&
pending_vars
,
&
ready_vars
](
VarHandleBase
&
var
)
{
pending_vars
.
insert
(
&
var
);
...
...
@@ -122,24 +113,26 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
InsertPendingOp
(
*
op
);
}
auto
run_all_ready_ops
=
[
&
]
{
for
(
auto
*
op
:
ready_ops
)
{
if
(
op
->
IsMultiDeviceTransfer
()
&&
allow_op_delay_
)
{
delayed_ops
.
insert
(
op
);
delayed_vars
.
insert
(
op
->
outputs_
.
begin
(),
op
->
outputs_
.
end
());
ready_vars
.
Extend
(
op
->
outputs_
);
continue
;
}
auto
run_all_ops
=
[
&
](
std
::
unordered_set
<
OpHandleBase
*>
&
set
)
{
for
(
auto
*
op
:
set
)
{
running_ops_
++
;
RunOp
(
&
ready_vars
,
op
);
}
ready_ops
.
clear
();
set
.
clear
();
};
// Step 3. Execution
while
(
!
pending_vars
.
empty
()
||
!
ready_ops
.
empty
()
||
!
delayed_ops
.
empty
()
)
{
while
(
!
pending_vars
.
empty
())
{
// 1. Run All Ready ops
run_all_ready_ops
();
// Keep loop until all vars are ready.
//
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
// ready_ops have been performed.
if
(
ready_ops
.
empty
()
&&
allow_op_delay_
)
{
run_all_ops
(
delayed_ops
);
}
else
{
run_all_ops
(
ready_ops
);
}
// 2. Find ready variable
bool
timeout
;
...
...
@@ -160,29 +153,16 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto
&
deps
=
pending_ops
[
op
];
--
deps
;
if
(
deps
==
0
)
{
if
(
delayed_vars
.
find
(
ready_var
)
!=
delayed_vars
.
end
()
)
{
blocked_by_
delayed_ops
.
insert
(
op
);
if
(
op
->
IsMultiDeviceTransfer
()
&&
allow_op_delay_
)
{
delayed_ops
.
insert
(
op
);
}
else
{
ready_ops
.
insert
(
op
);
}
}
}
}
// When there are no other ops to schedule, schedule buffered delayed
// ops and unblock other ops.
if
(
ready_ops
.
empty
()
&&
!
delayed_ops
.
empty
()
&&
running_ops_
==
0
)
{
RunDelayedOps
(
delayed_ops
);
delayed_ops
.
clear
();
for
(
auto
*
op
:
blocked_by_delayed_ops
)
{
ready_ops
.
insert
(
op
);
}
blocked_by_delayed_ops
.
clear
();
}
// Keep loop until all vars are ready.
}
PADDLE_ENFORCE
(
ready_ops
.
empty
());
PADDLE_ENFORCE
(
delayed_ops
.
empty
());
PADDLE_ENFORCE
(
blocked_by_delayed_ops
.
empty
());
// Wait FetchOps.
if
(
!
fetch_ops
.
empty
())
{
...
...
paddle/fluid/framework/details/threaded_ssa_graph_executor.h
浏览文件 @
b53f7e2c
...
...
@@ -88,8 +88,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
void
RunOp
(
BlockingQueue
<
VarHandleBase
*>
*
ready_var_q
,
details
::
OpHandleBase
*
op
);
void
RunDelayedOps
(
const
std
::
unordered_set
<
OpHandleBase
*>
&
delayed_ops
);
private:
std
::
unique_ptr
<::
ThreadPool
>
pool_
;
std
::
vector
<
Scope
*>
local_scopes_
;
...
...
python/paddle/fluid/tests/unittests/test_parallel_executor.py
浏览文件 @
b53f7e2c
...
...
@@ -206,18 +206,19 @@ class TestParallelExecutorBase(unittest.TestCase):
feed_dict
=
{}):
main
=
fluid
.
Program
()
startup
=
fluid
.
Program
()
startup
.
random_seed
=
1
# Fix random seed
with
fluid
.
program_guard
(
main
,
startup
):
loss
=
method
(
use_feed
=
len
(
feed_dict
)
>
0
)
adam
=
fluid
.
optimizer
.
Adam
()
adam
.
minimize
(
loss
)
if
memory_opt
:
fluid
.
memory_optimize
(
main
)
place
=
fluid
.
CUDAPlace
(
0
)
startup_exe
=
fluid
.
Executor
(
place
)
startup_exe
.
run
(
startup
)
exe
=
fluid
.
ParallelExecutor
(
True
,
loss_name
=
loss
.
name
)
exe
=
fluid
.
ParallelExecutor
(
True
,
loss_name
=
loss
.
name
,
allow_op_delay
=
allow_op_delay
)
if
batch_size
is
not
None
:
batch_size
*=
fluid
.
core
.
get_cuda_device_count
()
begin
=
time
.
time
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录