Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
PaddleRec
提交
3eba3369
P
PaddleRec
项目概览
BaiXuePrincess
/
PaddleRec
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleRec
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3eba3369
编写于
4月 16, 2020
作者:
T
tangwei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix import
上级
fbbc7134
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
142 addition
and
3 deletion
+142
-3
fleetrec/core/engine/local_mpi_engine.py
fleetrec/core/engine/local_mpi_engine.py
+3
-3
fleetrec/core/trainers/ctr_coding_trainer.py
fleetrec/core/trainers/ctr_coding_trainer.py
+139
-0
未找到文件。
fleetrec/core/engine/local_mpi_engine.py
浏览文件 @
3eba3369
...
...
@@ -34,8 +34,8 @@ class LocalMPIEngine(Engine):
log_fns
=
[]
factory
=
"fleetrec.core.factory"
mpi_
cmd
=
"mpirun -npernode 2 -timestamp-output -tag-output"
.
split
(
" "
)
cmd
=
mpi_cmd
.
extend
([
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
])
cmd
=
"mpirun -npernode 2 -timestamp-output -tag-output"
.
split
(
" "
)
cmd
.
extend
([
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
])
if
logs_dir
is
not
None
:
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
...
...
@@ -49,7 +49,7 @@ class LocalMPIEngine(Engine):
for
i
in
range
(
len
(
procs
)):
if
len
(
log_fns
)
>
0
:
log_fns
[
i
].
close
()
procs
[
i
].
terminate
()
procs
[
i
].
wait
()
print
(
"all workers and parameter servers already completed"
,
file
=
sys
.
stderr
)
def
run
(
self
):
...
...
fleetrec/core/trainers/ctr_coding_trainer.py
0 → 100755
浏览文件 @
3eba3369
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
os
import
sys
import
time
import
json
import
datetime
import
numpy
as
np
import
paddle.fluid
as
fluid
from
paddle.fluid.incubate.fleet.parameter_server.pslib
import
fleet
from
paddle.fluid.incubate.fleet.base.role_maker
import
MPISymetricRoleMaker
from
fleetrec.core.utils
import
envs
from
fleetrec.core.trainer
import
Trainer
class
CtrPaddleTrainer
(
Trainer
):
"""R
"""
def
__init__
(
self
,
config
):
"""R
"""
Trainer
.
__init__
(
self
,
config
)
self
.
global_config
=
config
self
.
_metrics
=
{}
self
.
processor_register
()
def
processor_register
(
self
):
role
=
MPISymetricRoleMaker
()
fleet
.
init
(
role
)
if
fleet
.
is_server
():
self
.
regist_context_processor
(
'uninit'
,
self
.
instance
)
self
.
regist_context_processor
(
'init_pass'
,
self
.
init
)
self
.
regist_context_processor
(
'server_pass'
,
self
.
server
)
else
:
self
.
regist_context_processor
(
'uninit'
,
self
.
instance
)
self
.
regist_context_processor
(
'init_pass'
,
self
.
init
)
self
.
regist_context_processor
(
'train_pass'
,
self
.
train
)
self
.
regist_context_processor
(
'terminal_pass'
,
self
.
terminal
)
def
_get_dataset
(
self
):
namespace
=
"train.reader"
inputs
=
self
.
model
.
get_inputs
()
threads
=
envs
.
get_global_env
(
"train.threads"
,
None
)
batch_size
=
envs
.
get_global_env
(
"batch_size"
,
None
,
namespace
)
reader_class
=
envs
.
get_global_env
(
"class"
,
None
,
namespace
)
abs_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
reader
=
os
.
path
.
join
(
abs_dir
,
'../utils'
,
'reader_instance.py'
)
pipe_cmd
=
"python {} {} {} {}"
.
format
(
reader
,
reader_class
,
"TRAIN"
,
self
.
_config
)
train_data_path
=
envs
.
get_global_env
(
"train_data_path"
,
None
,
namespace
)
dataset
=
fluid
.
DatasetFactory
().
create_dataset
()
dataset
.
set_use_var
(
inputs
)
dataset
.
set_pipe_command
(
pipe_cmd
)
dataset
.
set_batch_size
(
batch_size
)
dataset
.
set_thread
(
threads
)
file_list
=
[
os
.
path
.
join
(
train_data_path
,
x
)
for
x
in
os
.
listdir
(
train_data_path
)
]
dataset
.
set_filelist
(
file_list
)
return
dataset
def
instance
(
self
,
context
):
models
=
envs
.
get_global_env
(
"train.model.models"
)
model_class
=
envs
.
lazy_instance
(
models
,
"Model"
)
self
.
model
=
model_class
(
None
)
context
[
'status'
]
=
'init_pass'
def
init
(
self
,
context
):
"""R
"""
self
.
model
.
train_net
()
optimizer
=
self
.
model
.
optimizer
()
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
=
{
"use_cvm"
:
False
})
optimizer
.
minimize
(
self
.
model
.
get_cost_op
())
if
fleet
.
is_server
():
context
[
'status'
]
=
'server_pass'
else
:
self
.
fetch_vars
=
[]
self
.
fetch_alias
=
[]
self
.
fetch_period
=
self
.
model
.
get_fetch_period
()
metrics
=
self
.
model
.
get_metrics
()
if
metrics
:
self
.
fetch_vars
=
metrics
.
values
()
self
.
fetch_alias
=
metrics
.
keys
()
context
[
'status'
]
=
'train_pass'
def
server
(
self
,
context
):
fleet
.
run_server
()
context
[
'is_exit'
]
=
True
def
train
(
self
,
context
):
self
.
_exe
.
run
(
fluid
.
default_startup_program
())
fleet
.
init_worker
()
dataset
=
self
.
_get_dataset
()
shuf
=
np
.
array
([
fleet
.
worker_index
()])
gs
=
shuf
*
0
fleet
.
_role_maker
.
_node_type_comm
.
Allreduce
(
shuf
,
gs
)
print
(
"trainer id: {}, trainers: {}, gs: {}"
.
format
(
fleet
.
worker_index
(),
fleet
.
worker_num
(),
gs
))
epochs
=
envs
.
get_global_env
(
"train.epochs"
)
for
i
in
range
(
epochs
):
self
.
_exe
.
train_from_dataset
(
program
=
fluid
.
default_main_program
(),
dataset
=
dataset
,
fetch_list
=
self
.
fetch_vars
,
fetch_info
=
self
.
fetch_alias
,
print_period
=
self
.
fetch_period
)
context
[
'status'
]
=
'terminal_pass'
fleet
.
stop_worker
()
def
terminal
(
self
,
context
):
print
(
"terminal ended."
)
context
[
'is_exit'
]
=
True
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录