Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
043c2308
P
Paddle
项目概览
PaddlePaddle
/
Paddle
1 年多 前同步成功
通知
2302
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
043c2308
编写于
4月 04, 2018
作者:
X
Xin Pan
提交者:
GitHub
4月 04, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9637 from panyx0718/feed
Add feed support for ParallelExecutor
上级
f8dd03dc
bdea5bee
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
102 addition
and
39 deletion
+102
-39
paddle/fluid/framework/lod_tensor.h
paddle/fluid/framework/lod_tensor.h
+1
-0
paddle/fluid/framework/parallel_executor.cc
paddle/fluid/framework/parallel_executor.cc
+19
-2
paddle/fluid/framework/parallel_executor.h
paddle/fluid/framework/parallel_executor.h
+5
-1
python/paddle/fluid/parallel_executor.py
python/paddle/fluid/parallel_executor.py
+31
-10
python/paddle/fluid/tests/unittests/test_parallel_executor.py
...on/paddle/fluid/tests/unittests/test_parallel_executor.py
+46
-26
未找到文件。
paddle/fluid/framework/lod_tensor.h
浏览文件 @
043c2308
...
...
@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
return
(
lod_
)[
level
].
size
()
-
1
;
}
// Split LoDTensor and copy to each place specified in places.
std
::
vector
<
LoDTensor
>
SplitLoDTensor
(
const
std
::
vector
<
platform
::
Place
>
places
)
const
;
...
...
paddle/fluid/framework/parallel_executor.cc
浏览文件 @
043c2308
...
...
@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
void
ParallelExecutor
::
Run
(
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
,
const
std
::
string
&
fetched_var_name
)
{
void
ParallelExecutor
::
Run
(
const
std
::
vector
<
std
::
string
>
&
fetch_tensors
,
const
std
::
string
&
fetched_var_name
,
const
std
::
unordered_map
<
std
::
string
,
LoDTensor
>
&
feed_tensors
)
{
platform
::
RecordBlock
b
(
0
);
SplitTensorToPlaces
(
feed_tensors
);
auto
fetch_data
=
member_
->
executor_
->
Run
(
fetch_tensors
);
*
member_
->
global_scope_
->
Var
(
fetched_var_name
)
->
GetMutable
<
FeedFetchList
>
()
=
fetch_data
;
}
void
ParallelExecutor
::
SplitTensorToPlaces
(
const
std
::
unordered_map
<
std
::
string
,
LoDTensor
>
&
feed_tensors
)
{
for
(
auto
it
:
feed_tensors
)
{
auto
lod_tensors
=
it
.
second
.
SplitLoDTensor
(
member_
->
places_
);
for
(
size_t
j
=
0
;
j
<
member_
->
places_
.
size
();
++
j
)
{
// TODO(panxy0718): Do I need to delete this var?
member_
->
local_scopes_
[
j
]
->
Var
(
it
.
first
)
->
GetMutable
<
LoDTensor
>
()
->
ShareDataWith
(
lod_tensors
[
j
]);
}
}
}
}
// namespace framework
}
// namespace paddle
paddle/fluid/framework/parallel_executor.h
浏览文件 @
043c2308
...
...
@@ -42,9 +42,13 @@ class ParallelExecutor {
bool
allow_op_delay
);
void
Run
(
const
std
::
vector
<
std
::
string
>&
fetch_tensors
,
const
std
::
string
&
fetched_var_name
=
"fetched_var"
);
const
std
::
string
&
fetched_var_name
,
const
std
::
unordered_map
<
std
::
string
,
LoDTensor
>&
feed_tensors
);
private:
void
SplitTensorToPlaces
(
const
std
::
unordered_map
<
std
::
string
,
LoDTensor
>&
feed_tensors
);
ParallelExecutorPrivate
*
member_
;
void
BCastParamsToGPUs
(
const
ProgramDesc
&
startup_program
)
const
;
...
...
python/paddle/fluid/parallel_executor.py
浏览文件 @
043c2308
...
...
@@ -26,25 +26,29 @@ class ParallelExecutor(object):
use_cuda
,
num_threads
=
None
,
allow_op_delay
=
False
):
places
=
[]
self
.
_places
=
[]
self
.
_act_places
=
[]
if
use_cuda
:
for
i
in
xrange
(
core
.
get_cuda_device_count
()):
p
=
core
.
Place
()
p
.
set_place
(
core
.
CUDAPlace
(
i
))
places
.
append
(
p
)
self
.
_act_places
.
append
(
core
.
CUDAPlace
(
i
))
p
.
set_place
(
self
.
_act_places
[
-
1
])
self
.
_places
.
append
(
p
)
else
:
for
i
in
xrange
(
multiprocessing
.
cpu_count
()):
p
=
core
.
Place
()
p
.
set_place
(
core
.
CPUPlace
())
places
.
append
(
p
)
self
.
_act_places
.
append
(
core
.
CPUPlace
(
i
))
p
.
set_place
(
self
.
_act_places
[
-
1
])
self
.
_places
.
append
(
p
)
assert
self
.
_places
,
"no place for execution"
if
num_threads
is
None
:
if
use_cuda
:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
num_threads
=
len
(
places
)
num_threads
=
len
(
self
.
_
places
)
else
:
min
(
len
(
places
)
*
2
,
multiprocessing
.
cpu_count
())
min
(
len
(
self
.
_
places
)
*
2
,
multiprocessing
.
cpu_count
())
startup
=
framework
.
default_startup_program
()
main
=
framework
.
default_main_program
()
...
...
@@ -53,7 +57,7 @@ class ParallelExecutor(object):
self
.
executor
=
core
.
ParallelExecutor
(
num_threads
,
True
if
use_cuda
else
False
,
# use_event
places
,
self
.
_
places
,
set
([
p
.
name
for
p
in
main
.
global_block
().
iter_parameters
()
if
not
p
.
stop_gradient
...
...
@@ -65,8 +69,25 @@ class ParallelExecutor(object):
allow_op_delay
)
self
.
scope
=
scope
def
run
(
self
,
fetch_list
):
def
run
(
self
,
fetch_list
,
feed_dict
=
{}):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
if
not
isinstance
(
feed_dict
,
dict
):
raise
TypeError
(
"feed_dict should be a dict"
)
feed_tensor_dict
=
{}
for
i
,
feed_name
in
enumerate
(
feed_dict
):
feed_tensor
=
feed_dict
[
feed_name
]
if
not
isinstance
(
feed_tensor
,
core
.
LoDTensor
):
feed_tensor
=
core
.
LoDTensor
()
feed_tensor
.
set
(
feed_dict
[
feed_name
],
self
.
_act_places
[
0
])
feed_tensor_dict
[
feed_name
]
=
feed_tensor
fetch_var_name
=
'@FETCHED_VAR_NAME@'
self
.
executor
.
run
(
fetch_list
,
fetch_var_name
)
self
.
executor
.
run
(
fetch_list
,
fetch_var_name
,
feed_tensor_dict
)
arr
=
self
.
scope
.
find_var
(
fetch_var_name
).
get_lod_tensor_array
()
return
[
arr
[
i
]
for
i
in
range
(
len
(
arr
))]
python/paddle/fluid/tests/unittests/test_parallel_executor.py
浏览文件 @
043c2308
...
...
@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist
import
paddle.dataset.wmt16
as
wmt16
def
simple_fc_net
():
reader
=
fluid
.
layers
.
open_recordio_file
(
filename
=
'./mnist.recordio'
,
shapes
=
[[
-
1
,
784
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
])
img
,
label
=
fluid
.
layers
.
read_file
(
reader
)
def
simple_fc_net
(
use_feed
):
if
use_feed
:
img
=
fluid
.
layers
.
data
(
name
=
'image'
,
shape
=
[
784
],
dtype
=
'float32'
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
else
:
reader
=
fluid
.
layers
.
open_recordio_file
(
filename
=
'./mnist.recordio'
,
shapes
=
[[
-
1
,
784
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
])
img
,
label
=
fluid
.
layers
.
read_file
(
reader
)
hidden
=
img
for
_
in
xrange
(
4
):
hidden
=
fluid
.
layers
.
fc
(
...
...
@@ -42,13 +46,18 @@ def simple_fc_net():
return
loss
def
fc_with_batchnorm
():
reader
=
fluid
.
layers
.
open_recordio_file
(
filename
=
'./mnist.recordio'
,
shapes
=
[[
-
1
,
784
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
])
img
,
label
=
fluid
.
layers
.
read_file
(
reader
)
def
fc_with_batchnorm
(
use_feed
):
if
use_feed
:
img
=
fluid
.
layers
.
data
(
name
=
'image'
,
shape
=
[
784
],
dtype
=
'float32'
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
else
:
reader
=
fluid
.
layers
.
open_recordio_file
(
filename
=
'./mnist.recordio'
,
shapes
=
[[
-
1
,
784
],
[
-
1
,
1
]],
lod_levels
=
[
0
,
0
],
dtypes
=
[
'float32'
,
'int64'
])
img
,
label
=
fluid
.
layers
.
read_file
(
reader
)
hidden
=
img
for
_
in
xrange
(
1
):
hidden
=
fluid
.
layers
.
fc
(
...
...
@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return
fluid
.
layers
.
elementwise_add
(
x
=
short
,
y
=
scale
,
act
=
'relu'
)
def
SE_ResNeXt152Small
(
batch_size
=
2
):
def
SE_ResNeXt152Small
(
batch_size
=
2
,
use_feed
=
False
):
assert
not
use_feed
,
"SE_ResNeXt doesn't support feed yet"
img
=
fluid
.
layers
.
fill_constant
(
shape
=
[
batch_size
,
3
,
224
,
224
],
dtype
=
'float32'
,
value
=
0.0
)
label
=
fluid
.
layers
.
fill_constant
(
...
...
@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase):
memory_opt
=
True
,
iter
=
10
,
batch_size
=
None
,
allow_op_delay
=
False
):
allow_op_delay
=
False
,
feed_dict
=
{}):
main
=
fluid
.
Program
()
startup
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
,
startup
):
loss
=
method
()
loss
=
method
(
use_feed
=
len
(
feed_dict
)
>
0
)
adam
=
fluid
.
optimizer
.
Adam
()
adam
.
minimize
(
loss
)
if
memory_opt
:
fluid
.
memory_optimize
(
main
)
exe
=
fluid
.
ParallelExecutor
(
loss_name
=
loss
.
name
,
use_cuda
=
True
,
allow_op_delay
=
allow_op_delay
)
exe
=
fluid
.
ParallelExecutor
(
loss_name
=
loss
.
name
,
use_cuda
=
True
)
if
batch_size
is
not
None
:
batch_size
*=
fluid
.
core
.
get_cuda_device_count
()
begin
=
time
.
time
()
first_loss
,
=
exe
.
run
([
loss
.
name
])
first_loss
,
=
exe
.
run
([
loss
.
name
]
,
feed_dict
=
feed_dict
)
first_loss
=
numpy
.
array
(
first_loss
)
for
i
in
xrange
(
iter
):
exe
.
run
([])
exe
.
run
([]
,
feed_dict
=
feed_dict
)
last_loss
,
=
exe
.
run
([
loss
.
name
])
last_loss
,
=
exe
.
run
([
loss
.
name
]
,
feed_dict
=
feed_dict
)
end
=
time
.
time
()
if
batch_size
is
not
None
:
...
...
@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase):
self
.
check_network_convergence
(
simple_fc_net
)
self
.
check_network_convergence
(
simple_fc_net
,
allow_op_delay
=
True
)
img
=
numpy
.
zeros
(
shape
=
[
32
,
784
],
dtype
=
'float32'
)
label
=
numpy
.
ones
(
shape
=
[
32
,
1
],
dtype
=
'int64'
)
self
.
check_network_convergence
(
simple_fc_net
,
feed_dict
=
{
"image"
:
img
,
"label"
:
label
})
def
test_batchnorm_fc
(
self
):
self
.
check_network_convergence
(
fc_with_batchnorm
)
self
.
check_network_convergence
(
fc_with_batchnorm
,
allow_op_delay
=
True
)
img
=
numpy
.
zeros
(
shape
=
[
32
,
784
],
dtype
=
'float32'
)
label
=
numpy
.
ones
(
shape
=
[
32
,
1
],
dtype
=
'int64'
)
self
.
check_network_convergence
(
fc_with_batchnorm
,
feed_dict
=
{
"image"
:
img
,
"label"
:
label
})
class
TestResnet
(
TestParallelExecutorBase
):
...
...
@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
import
transformer_model
def
transformer
():
def
transformer
(
use_feed
):
assert
not
use_feed
,
"transfomer doesn't support feed yet"
return
transformer_model
.
transformer
(
ModelHyperParams
.
src_vocab_size
+
1
,
ModelHyperParams
.
trg_vocab_size
+
1
,
ModelHyperParams
.
max_length
+
1
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录