Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
75bfdb3a
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
75bfdb3a
编写于
4月 02, 2018
作者:
T
typhoonzero
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refine
上级
fc4bcdd7
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
26 addition
and
18 deletion
+26
-18
paddle/fluid/framework/executor.cc
paddle/fluid/framework/executor.cc
+1
-1
paddle/fluid/operators/listen_and_serv_op.cc
paddle/fluid/operators/listen_and_serv_op.cc
+25
-17
未找到文件。
paddle/fluid/framework/executor.cc
浏览文件 @
75bfdb3a
...
...
@@ -279,7 +279,7 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
return
std
::
unique_ptr
<
ExecutorPrepareContext
>
(
ctx
);
}
std
::
vector
<
std
::
shared_ptr
<
ExecutorPrepareContext
>>
Prepare
(
std
::
vector
<
std
::
shared_ptr
<
ExecutorPrepareContext
>>
Executor
::
Prepare
(
const
ProgramDesc
&
program
,
const
std
::
vector
<
int
>&
block_ids
)
{
std
::
vector
<
std
::
shared_ptr
<
ExecutorPrepareContext
>>
result
;
for
(
auto
&
bid
:
block_ids
)
{
...
...
paddle/fluid/operators/listen_and_serv_op.cc
浏览文件 @
75bfdb3a
...
...
@@ -54,20 +54,24 @@ static void CreateTensorFromMessageType(framework::Variable *var,
}
}
static
void
ParallelExecuteBlocks
(
const
std
::
vector
<
size_t
>
&
parallel_blkids
,
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
scope
)
{
static
void
ParallelExecuteBlocks
(
const
std
::
vector
<
size_t
>
&
parallel_blkids
,
framework
::
Executor
*
executor
,
const
std
::
vector
<
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>>
&
prepared
,
framework
::
ProgramDesc
*
program
,
framework
::
Scope
*
scope
)
{
std
::
vector
<
std
::
future
<
void
>>
fs
;
for
(
size_t
idx
:
parallel_blkids
)
{
fs
.
push_back
(
framework
::
Async
([
&
executor
,
&
program
,
&
scope
,
idx
]()
{
int
run_block
=
idx
;
// thread local
try
{
executor
->
Run
(
*
program
,
scope
,
run_block
,
false
,
false
);
}
catch
(
std
::
exception
&
e
)
{
LOG
(
ERROR
)
<<
"run sub program error "
<<
e
.
what
();
}
}));
fs
.
push_back
(
framework
::
Async
([
&
executor
,
&
prepared
,
&
program
,
&
scope
,
idx
]()
{
int
run_block
=
idx
;
// thread local
try
{
// executor->Run(*program, scope, run_block, false, false);
executor
->
RunPreparedContext
(
prepared
[
run_block
].
get
(),
scope
,
false
,
false
);
}
catch
(
std
::
exception
&
e
)
{
LOG
(
ERROR
)
<<
"run sub program error "
<<
e
.
what
();
}
}));
}
for
(
size_t
i
=
0
;
i
<
fs
.
size
();
++
i
)
fs
[
i
].
wait
();
}
...
...
@@ -105,15 +109,18 @@ class ListenAndServOp : public framework::OperatorBase {
auto
*
block
=
Attr
<
framework
::
BlockDesc
*>
(
kOptimizeBlock
);
auto
*
program
=
block
->
Program
();
in
t
num_blocks
=
program
->
Size
();
size_
t
num_blocks
=
program
->
Size
();
PADDLE_ENFORCE_GE
(
num_blocks
,
2
,
"server program should have at least 2 blocks"
);
framework
::
Executor
executor
(
dev_place
);
std
::
vector
<
int
>
block_list
;
for
(
in
t
blkid
=
1
;
blkid
<
num_blocks
;
++
blkid
)
for
(
size_
t
blkid
=
1
;
blkid
<
num_blocks
;
++
blkid
)
block_list
.
push_back
(
blkid
);
auto
prepared
=
executor
.
Prepare
(
*
program
,
block_list
);
prepared
.
insert
(
prepared
.
begin
(),
std
::
shared_ptr
<
framework
::
ExecutorPrepareContext
>
(
nullptr
));
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool
exit_flag
=
false
;
...
...
@@ -161,21 +168,22 @@ class ListenAndServOp : public framework::OperatorBase {
// The optimize blocks which have the same parent ID would run parallel
// TODO(Yancey1989): need to use ParallelExecutor for future
size
_t
last_parent_blkid
=
program
->
Block
(
1
).
Parent
();
int32
_t
last_parent_blkid
=
program
->
Block
(
1
).
Parent
();
std
::
vector
<
size_t
>
parallel_blkids
;
parallel_blkids
.
push_back
(
1
);
double
ts
=
detail
::
GetTimestamp
();
for
(
size_t
blkid
=
2
;
blkid
<
num_blocks
;
++
blkid
)
{
if
(
program
->
Block
(
blkid
).
Parent
()
!=
last_parent_blkid
)
{
for
(
size_t
idx
:
parallel_blkids
)
VLOG
(
3
)
<<
idx
;
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
program
,
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
pr
epared
,
pr
ogram
,
&
recv_scope
);
parallel_blkids
.
clear
();
last_parent_blkid
=
program
->
Block
(
blkid
).
Parent
();
}
parallel_blkids
.
push_back
(
blkid
);
}
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
program
,
&
recv_scope
);
ParallelExecuteBlocks
(
parallel_blkids
,
&
executor
,
prepared
,
program
,
&
recv_scope
);
VLOG
(
2
)
<<
"run all blocks spent (ms) "
<<
detail
::
GetTimestamp
()
-
ts
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录