Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
4e86c89b
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4e86c89b
编写于
5月 17, 2018
作者:
Y
Yu Yang
提交者:
GitHub
5月 17, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10620 from reyoung/feature/trainer_by_pe
Draft for train by parallel executor
上级
dbbeccc4
f0475488
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
71 addition
and
28 deletion
+71
-28
python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py
...-level-api/recognize_digits/test_recognize_digits_conv.py
+8
-2
python/paddle/fluid/trainer.py
python/paddle/fluid/trainer.py
+63
-26
未找到文件。
python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py
浏览文件 @
4e86c89b
...
...
@@ -62,7 +62,10 @@ def train(use_cuda, train_program, save_dirname):
optimizer
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
0.001
)
trainer
=
fluid
.
Trainer
(
train_func
=
train_program
,
place
=
place
,
optimizer
=
optimizer
)
train_func
=
train_program
,
place
=
place
,
optimizer
=
optimizer
,
parallel
=
True
)
def
event_handler
(
event
):
if
isinstance
(
event
,
fluid
.
EndEpochEvent
):
...
...
@@ -87,6 +90,9 @@ def train(use_cuda, train_program, save_dirname):
event
.
epoch
+
1
,
float
(
avg_cost
),
float
(
acc
)))
if
math
.
isnan
(
float
(
avg_cost
)):
sys
.
exit
(
"got NaN loss, training failed."
)
elif
isinstance
(
event
,
fluid
.
EndStepEvent
):
print
(
"Step {0}, Epoch {1} Metrics {2}"
.
format
(
event
.
step
,
event
.
epoch
,
map
(
numpy
.
array
,
event
.
metrics
)))
train_reader
=
paddle
.
batch
(
paddle
.
reader
.
shuffle
(
...
...
@@ -131,4 +137,4 @@ def main(use_cuda):
if
__name__
==
'__main__'
:
# for use_cuda in (False, True):
main
(
use_cuda
=
Fals
e
)
main
(
use_cuda
=
Tru
e
)
python/paddle/fluid/trainer.py
浏览文件 @
4e86c89b
...
...
@@ -20,6 +20,7 @@ import data_feeder
import
contextlib
import
io
import
unique_name
import
parallel_executor
# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module
import
optimizer
as
opt_module
...
...
@@ -48,12 +49,14 @@ class BeginStepEvent(object):
def
__init__
(
self
,
epoch_id
,
step_id
):
self
.
epoch
=
epoch_id
self
.
step
=
step_id
self
.
fetch_metrics
=
True
class
EndStepEvent
(
object
):
def
__init__
(
self
,
epoch_id
,
step_id
):
def
__init__
(
self
,
epoch_id
,
step_id
,
metrics
):
self
.
epoch
=
epoch_id
self
.
step
=
step_id
self
.
metrics
=
metrics
def
check_and_get_place
(
place
):
...
...
@@ -87,12 +90,17 @@ class Trainer(object):
Args:
train_func(callable): A function which will return loss. The loss must be a scalar.
infer_func(callable): A function which will return predict, used to save inference model
optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer
place: The device place of this trainer.
"""
def
__init__
(
self
,
train_func
,
optimizer
,
param_path
=
None
,
place
=
None
):
def
__init__
(
self
,
train_func
,
optimizer
,
param_path
=
None
,
place
=
None
,
parallel
=
False
):
self
.
parallel
=
parallel
# 1. we need to generate a framework.Program by calling
# program_func. Reference: fluid.program_guard in
# test_word2vec.py
...
...
@@ -106,14 +114,14 @@ class Trainer(object):
with
framework
.
program_guard
(
self
.
train_program
,
self
.
startup_program
):
program_func_outs
=
train_func
()
self
.
t
est
_outputs
=
program_func_outs
if
isinstance
(
self
.
t
rain_func
_outputs
=
program_func_outs
if
isinstance
(
program_func_outs
,
list
)
else
[
program_func_outs
]
self
.
test_program
=
self
.
train_program
.
clone
()
if
not
isinstance
(
optimizer
,
opt_module
.
Optimizer
):
raise
TypeError
(
"The optimizer should be an instance of Optimizer"
)
# The fisrt element of program_func_outs is loss.
loss
=
self
.
t
est
_outputs
[
0
]
loss
=
self
.
t
rain_func
_outputs
[
0
]
optimize_ops
,
params_grads
=
optimizer
.
minimize
(
loss
)
self
.
place
=
check_and_get_place
(
place
)
...
...
@@ -202,12 +210,7 @@ class Trainer(object):
'TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
)
def
train
(
self
,
num_epochs
,
event_handler
,
reader
,
feed_order
,
parallel
=
False
):
def
train
(
self
,
num_epochs
,
event_handler
,
reader
=
None
,
feed_order
=
None
):
"""
Train the model.
...
...
@@ -215,25 +218,24 @@ class Trainer(object):
num_epochs: The number of epoch. An epoch will process all data in reader
event_handler: The event handler. A function with type (ev:Event)->void
reader:
parallel: True if use multi-CPUs or multi-GPUs
feed_order: Feeding order of reader. None will following the defining
order in program
Returns:
"""
if
parallel
:
raise
NotImplementedError
(
"Parallel Executor version of trainer is not implemented"
)
training_role
=
os
.
getenv
(
"PADDLE_TRAINING_ROLE"
,
""
)
if
training_role
==
"PSERVER"
:
with
self
.
_prog_and_scope_guard
():
exe
=
executor
.
Executor
(
self
.
place
)
exe
.
run
()
return
self
.
_train_by_executor
(
num_epochs
,
event_handler
,
reader
,
feed_order
)
if
self
.
parallel
:
self
.
_train_by_parallel_executor
(
num_epochs
,
event_handler
,
reader
,
feed_order
)
else
:
self
.
_train_by_executor
(
num_epochs
,
event_handler
,
reader
,
feed_order
)
def
test
(
self
,
reader
,
feed_order
):
"""
...
...
@@ -245,7 +247,8 @@ class Trainer(object):
order in program
"""
return
self
.
_test_by_executor
(
reader
,
feed_order
,
self
.
test_outputs
)
return
self
.
_test_by_executor
(
reader
,
feed_order
,
self
.
train_func_outputs
)
def
save_params
(
self
,
param_path
):
# reference: save_persistables in io.py
...
...
@@ -279,13 +282,25 @@ class Trainer(object):
feeder
=
data_feeder
.
DataFeeder
(
feed_list
=
feed_var_list
,
place
=
self
.
place
)
exe
=
executor
.
Executor
(
self
.
place
)
for
epoch_id
in
range
(
num_epochs
):
event_handler
(
BeginEpochEvent
(
epoch_id
))
for
step_id
,
data
in
enumerate
(
reader
()):
event_handler
(
BeginStepEvent
(
epoch_id
,
step_id
))
exe
.
run
(
feed
=
feeder
.
feed
(
data
),
fetch_list
=
[])
event_handler
(
EndStepEvent
(
epoch_id
,
step_id
))
event_handler
(
EndEpochEvent
(
epoch_id
))
reader
=
feeder
.
decorate_reader
(
reader
,
multi_devices
=
False
)
self
.
_train_by_any_executor
(
event_handler
,
exe
,
num_epochs
,
reader
)
def
_train_by_any_executor
(
self
,
event_handler
,
exe
,
num_epochs
,
reader
):
for
epoch_id
in
range
(
num_epochs
):
event_handler
(
BeginEpochEvent
(
epoch_id
))
for
step_id
,
data
in
enumerate
(
reader
()):
begin_event
=
BeginStepEvent
(
epoch_id
,
step_id
)
event_handler
(
begin_event
)
if
begin_event
.
fetch_metrics
:
metrics
=
exe
.
run
(
feed
=
data
,
fetch_list
=
[
var
.
name
for
var
in
self
.
train_func_outputs
])
else
:
metrics
=
exe
.
run
(
feed
=
data
,
fetch_list
=
[])
event_handler
(
EndStepEvent
(
epoch_id
,
step_id
,
metrics
))
event_handler
(
EndEpochEvent
(
epoch_id
))
def
_test_by_executor
(
self
,
reader
,
feed_order
,
fetch_list
):
with
executor
.
scope_guard
(
self
.
scope
):
...
...
@@ -304,6 +319,28 @@ class Trainer(object):
return
[
x
/
count
for
x
in
accumulated
]
def
_train_by_parallel_executor
(
self
,
num_epochs
,
event_handler
,
reader
,
feed_order
):
with
self
.
_prog_and_scope_guard
():
pe
=
self
.
_get_or_create_parallel_executor
()
feed_var_list
=
build_feed_var_list
(
self
.
train_program
,
feed_order
)
feeder
=
data_feeder
.
DataFeeder
(
feed_list
=
feed_var_list
,
place
=
self
.
place
)
reader
=
feeder
.
decorate_reader
(
reader
,
multi_devices
=
True
)
for
epoch_id
in
range
(
num_epochs
):
self
.
_train_by_any_executor
(
event_handler
,
pe
,
num_epochs
,
reader
)
def
_get_parallel_executor
(
self
):
return
getattr
(
self
,
'parallel_executor'
,
None
)
def
_get_or_create_parallel_executor
(
self
):
if
self
.
_get_parallel_executor
()
is
None
:
self
.
parallel_executor
=
parallel_executor
.
ParallelExecutor
(
use_cuda
=
isinstance
(
self
.
place
,
core
.
CUDAPlace
),
loss_name
=
self
.
train_func_outputs
[
0
].
name
)
return
self
.
_get_parallel_executor
()
def
build_feed_var_list
(
program
,
feed_order
):
if
not
isinstance
(
program
,
framework
.
Program
):
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录