Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
acef55df
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
acef55df
编写于
4月 23, 2020
作者:
Z
Zeng Jinle
提交者:
GitHub
4月 23, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix isolated var fetch bug, test=develop (#24070)
上级
3ca700a9
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
168 addition
and
0 deletion
+168
-0
paddle/fluid/framework/details/multi_devices_helper.cc
paddle/fluid/framework/details/multi_devices_helper.cc
+26
-0
paddle/fluid/framework/ir/graph.cc
paddle/fluid/framework/ir/graph.cc
+14
-0
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
...k/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
+19
-0
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
...rk/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
+2
-0
python/paddle/fluid/tests/unittests/CMakeLists.txt
python/paddle/fluid/tests/unittests/CMakeLists.txt
+1
-0
python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py
...ts/unittests/test_parallel_executor_fetch_isolated_var.py
+106
-0
未找到文件。
paddle/fluid/framework/details/multi_devices_helper.cc
浏览文件 @
acef55df
...
...
@@ -210,6 +210,32 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
g
->
Set
(
kGraphDepVars
,
new
GraphDepVars
());
}
std
::
vector
<
VarHandle
*>
isolated_var_handles
;
for
(
auto
*
node
:
graph
->
Nodes
())
{
if
(
!
node
->
IsWrappedBy
<
VarHandleBase
>
())
{
continue
;
}
auto
&
var_handle_base
=
node
->
Wrapper
<
VarHandleBase
>
();
auto
*
var_handle
=
dynamic_cast
<
VarHandle
*>
(
&
var_handle_base
);
if
(
var_handle
&&
var_handle
->
PendingOps
().
empty
()
&&
var_handle
->
GeneratedOp
()
==
nullptr
)
{
isolated_var_handles
.
emplace_back
(
var_handle
);
}
}
for
(
auto
*
var_handle
:
isolated_var_handles
)
{
auto
dev_idx
=
var_handle
->
scope_idx
();
auto
&
src_vars
=
graph
->
Get
<
GraphVars
>
(
kGraphVars
)[
dev_idx
];
auto
*
dst_graph
=
graphs
[
dev_idx
].
get
();
auto
&
dst_vars
=
dst_graph
->
Get
<
GraphVars
>
(
kGraphVars
)[
0
];
VLOG
(
10
)
<<
"Move isolated var "
<<
var_handle
->
Name
()
<<
" at device "
<<
dev_idx
;
dst_graph
->
AddNode
(
graph
->
RemoveNode
(
var_handle
->
Node
()).
release
());
dst_vars
[
var_handle
->
Name
()].
emplace_back
(
var_handle
);
src_vars
.
erase
(
var_handle
->
Name
());
}
for
(
auto
&
pair
:
op_to_dev_idx
)
{
auto
*
op
=
pair
.
first
;
auto
dev_idx
=
pair
.
second
;
...
...
paddle/fluid/framework/ir/graph.cc
浏览文件 @
acef55df
...
...
@@ -44,11 +44,14 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromProgram(
all_vars
.
emplace
(
var
->
Name
(),
var
);
}
auto
not_visited_vars
=
all_vars
;
for
(
auto
*
op
:
program
.
Block
(
0
).
AllOps
())
{
ir
::
Node
*
node
=
CreateOpNode
(
op
);
// For input args, reuse the same var name if it was created before.
// Otherwise, create a new one.
for
(
auto
&
each_var_name
:
op
->
InputArgumentNames
())
{
not_visited_vars
.
erase
(
each_var_name
);
ir
::
Node
*
var
=
nullptr
;
if
(
var_nodes
.
find
(
each_var_name
)
!=
var_nodes
.
end
())
{
var
=
var_nodes
.
at
(
each_var_name
).
back
();
...
...
@@ -68,6 +71,7 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromProgram(
// For output args, always create a new var.
std
::
unordered_set
<
std
::
string
>
out_arg_set
;
for
(
auto
&
each_var_name
:
op
->
OutputArgumentNames
())
{
not_visited_vars
.
erase
(
each_var_name
);
if
(
each_var_name
!=
kEmptyVarName
)
{
PADDLE_ENFORCE_EQ
(
out_arg_set
.
count
(
each_var_name
),
0
,
platform
::
errors
::
InvalidArgument
(
...
...
@@ -91,6 +95,16 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromProgram(
var
->
inputs
.
push_back
(
node
);
}
}
for
(
auto
&
pair
:
not_visited_vars
)
{
const
auto
&
var_name
=
pair
.
first
;
auto
*
var_desc
=
pair
.
second
;
if
(
var_name
!=
kEmptyVarName
)
{
VLOG
(
10
)
<<
"Create isolated var node "
<<
var_name
;
var_nodes
[
var_name
].
push_back
(
CreateVarNode
(
var_desc
));
}
}
Set
<
const
std
::
vector
<
OpDesc
*>>
(
details
::
kStaleProgramOpDescs
,
new
std
::
vector
<
OpDesc
*>
(
program
.
Block
(
0
).
AllOps
()));
...
...
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
浏览文件 @
acef55df
...
...
@@ -174,9 +174,15 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const {
auto
nodes
=
graph
->
ReleaseNodes
();
ir
::
Graph
&
result
=
*
graph
;
std
::
vector
<
ir
::
Node
*>
isolated_vars
;
for
(
auto
&
node
:
nodes
)
{
if
(
node
->
IsVar
()
&&
node
->
Var
())
{
all_vars_
.
emplace
(
node
->
Name
(),
node
->
Var
());
if
(
node
->
inputs
.
empty
()
&&
node
->
outputs
.
empty
())
{
isolated_vars
.
emplace_back
(
node
.
get
());
}
}
}
...
...
@@ -185,6 +191,10 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const {
result
.
Set
(
details
::
kGraphDepVars
,
new
details
::
GraphDepVars
);
result
.
Set
(
kGraphOps
,
new
GraphOps
);
for
(
auto
*
var_node
:
isolated_vars
)
{
CreateIsolatedVarNode
(
&
result
,
var_node
);
}
bool
is_forwarding
=
true
;
for
(
ir
::
Node
*
node
:
sorted_ops
)
{
...
...
@@ -582,6 +592,15 @@ bool MultiDevSSAGraphBuilderBase::IsSparseGradient(
return
all_vars_
.
at
(
og
)
->
GetType
()
==
proto
::
VarType
::
SELECTED_ROWS
;
}
void
MultiDevSSAGraphBuilderBase
::
CreateIsolatedVarNode
(
ir
::
Graph
*
graph
,
ir
::
Node
*
var_node
)
const
{
for
(
size_t
i
=
0
;
i
<
places_
.
size
();
++
i
)
{
VLOG
(
10
)
<<
"Create isolated var node "
<<
var_node
->
Name
()
<<
" at device "
<<
i
;
CreateOrGetLatestVarHandle
(
graph
,
var_node
,
places_
[
i
],
i
);
}
}
void
AllReduceSSAGraphBuilder
::
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
{
...
...
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
浏览文件 @
acef55df
...
...
@@ -94,6 +94,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
void
CreateOpHandleIOs
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
size_t
device_id
)
const
;
void
CreateIsolatedVarNode
(
ir
::
Graph
*
result
,
ir
::
Node
*
var_node
)
const
;
#if defined(PADDLE_WITH_NCCL)
mutable
platform
::
NCCLContextMap
*
nccl_ctxs_
{
nullptr
};
mutable
platform
::
NCCLCommunicator
*
multi_nccl_ctxs_
{
nullptr
};
...
...
python/paddle/fluid/tests/unittests/CMakeLists.txt
浏览文件 @
acef55df
...
...
@@ -369,6 +369,7 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu
test_data_norm_op test_imperative_using_non_zero_gpu test_fuse_bn_act_pass
test_optimizer_in_control_flow test_dataloader_keep_order
test_dataloader_unkeep_order
test_parallel_executor_fetch_isolated_var
test_parallel_executor_inference_feed_partial_data
test_parallel_ssa_graph_inference_feed_partial_data
test_fetch_unmerged
...
...
python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_isolated_var.py
0 → 100644
浏览文件 @
acef55df
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
numpy
as
np
import
six
import
paddle.fluid
as
fluid
def
enable_parallel_ssa_executor
(
enabled
=
True
):
if
fluid
.
is_compiled_with_cuda
():
fluid
.
core
.
globals
()[
'FLAGS_enable_parallel_graph'
]
=
enabled
class
TestParallelExecutorFetchIsolatedVarBase
(
unittest
.
TestCase
):
def
build_network
(
self
,
is_training
):
x
=
fluid
.
data
(
name
=
'x'
,
shape
=
[
-
1
,
10
],
dtype
=
'float32'
)
y
=
fluid
.
data
(
name
=
'y'
,
shape
=
[
-
1
,
10
],
dtype
=
'float32'
)
fc
=
fluid
.
layers
.
fc
(
x
,
size
=
30
)
loss
=
fluid
.
layers
.
reduce_mean
(
fc
)
if
is_training
:
adam
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
1e-3
)
adam
.
minimize
(
loss
)
return
loss
,
y
def
exec_strategy
(
self
,
use_experimental_executor
):
strategy
=
fluid
.
ExecutionStrategy
()
strategy
.
use_experimental_executor
=
use_experimental_executor
return
strategy
def
places
(
self
,
use_gpu
,
dev_cnt
):
if
use_gpu
:
return
fluid
.
cuda_places
(
list
(
range
(
dev_cnt
)))
else
:
return
fluid
.
cpu_places
(
dev_cnt
)
def
test_main
(
self
):
for
use_gpu
in
[
False
,
True
]:
for
dev_cnt
in
[
1
,
2
]:
for
is_training
in
[
False
,
True
]:
for
use_experimental_executor
in
[
False
,
True
]:
for
use_parallel_ssa_executor
in
[
False
,
True
]:
func
=
lambda
:
self
.
run_impl
(
use_gpu
,
dev_cnt
,
is_training
,
use_experimental_executor
,
use_parallel_ssa_executor
)
self
.
run_func_with_guard
(
func
)
def
run_impl
(
self
,
use_gpu
,
dev_cnt
,
is_training
,
use_experimental_executor
,
use_parallel_ssa_executor
):
enable_parallel_ssa_executor
(
use_parallel_ssa_executor
)
if
fluid
.
is_compiled_with_cuda
():
if
fluid
.
core
.
globals
()[
'FLAGS_enable_parallel_graph'
]
and
not
use_gpu
:
return
else
:
if
use_gpu
:
return
loss
,
isolated_var
=
self
.
build_network
(
is_training
)
loss_name
=
loss
.
name
if
is_training
else
None
places
=
self
.
places
(
use_gpu
,
dev_cnt
)
exe
=
fluid
.
Executor
(
places
[
0
])
exe
.
run
(
fluid
.
default_startup_program
())
prog
=
fluid
.
CompiledProgram
(
fluid
.
default_main_program
(
)).
with_data_parallel
(
loss_name
=
loss_name
,
exec_strategy
=
self
.
exec_strategy
(
use_experimental_executor
),
places
=
places
)
BATCH_SIZE
=
8
*
dev_cnt
for
_
in
six
.
moves
.
range
(
10
):
x_np
=
np
.
random
.
random
(
size
=
[
BATCH_SIZE
,
10
]).
astype
(
'float32'
)
y_np
=
np
.
random
.
random
(
size
=
[
BATCH_SIZE
,
10
]).
astype
(
'float32'
)
_
,
y_np_fetch
=
exe
.
run
(
prog
,
feed
=
{
'x'
:
x_np
,
'y'
:
y_np
},
fetch_list
=
[
loss
,
isolated_var
])
self
.
assertTrue
(
np
.
array_equal
(
y_np
,
y_np_fetch
))
enable_parallel_ssa_executor
(
False
)
def
run_func_with_guard
(
self
,
func
):
with
fluid
.
program_guard
(
fluid
.
Program
(),
fluid
.
Program
()):
with
fluid
.
unique_name
.
guard
():
with
fluid
.
scope_guard
(
fluid
.
Scope
()):
func
()
if
__name__
==
'__main__'
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录