Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
PaddleRec
提交
0a4ba25e
P
PaddleRec
项目概览
BaiXuePrincess
/
PaddleRec
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleRec
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
0a4ba25e
编写于
9月 25, 2020
作者:
Y
yinhaofeng
提交者:
GitHub
9月 25, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' into deepfm
上级
21179e1e
8fd5aeda
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
540 addition
and
120 deletion
+540
-120
core/engine/cluster/cluster.py
core/engine/cluster/cluster.py
+38
-2
core/engine/cluster_utils.py
core/engine/cluster_utils.py
+324
-0
core/engine/local_cluster.py
core/engine/local_cluster.py
+63
-30
models/multitask/mmoe/config.yaml
models/multitask/mmoe/config.yaml
+2
-0
models/rank/dnn/README.md
models/rank/dnn/README.md
+103
-81
models/rank/dnn/data/get_slot_data.py
models/rank/dnn/data/get_slot_data.py
+1
-2
models/recall/word2vec/README.md
models/recall/word2vec/README.md
+6
-3
models/recall/youtube_dnn/README.md
models/recall/youtube_dnn/README.md
+2
-2
run.py
run.py
+1
-0
未找到文件。
core/engine/cluster/cluster.py
浏览文件 @
0a4ba25e
...
@@ -19,10 +19,16 @@ import copy
...
@@ -19,10 +19,16 @@ import copy
import
os
import
os
import
subprocess
import
subprocess
import
warnings
import
warnings
import
sys
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.utils
import
envs
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
ClusterEngine
(
Engine
):
class
ClusterEngine
(
Engine
):
...
@@ -47,8 +53,38 @@ class ClusterEngine(Engine):
...
@@ -47,8 +53,38 @@ class ClusterEngine(Engine):
self
.
backend
))
self
.
backend
))
def
start_worker_procs
(
self
):
def
start_worker_procs
(
self
):
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
if
(
envs
.
get_runtime_environ
(
"fleet_mode"
)
==
"COLLECTIVE"
):
trainer
.
run
()
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
)))
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
))):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
)]
print
(
"selected_gpus:{}"
.
format
(
selected_gpus
))
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
logs_dir
=
envs
.
get_runtime_environ
(
"log_dir"
)
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
print
(
"cluster:{}"
.
format
(
cluster
))
print
(
"pod:{}"
.
format
(
pod
))
else
:
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
trainer
.
run
()
def
start_master_procs
(
self
):
def
start_master_procs
(
self
):
if
self
.
backend
==
"PADDLECLOUD"
:
if
self
.
backend
==
"PADDLECLOUD"
:
...
...
core/engine/cluster_utils.py
0 → 100644
浏览文件 @
0a4ba25e
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
functools
import
logging
import
socket
import
time
import
os
import
signal
import
copy
import
sys
import
subprocess
from
contextlib
import
closing
import
socket
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
Cluster
(
object
):
def
__init__
(
self
,
hdfs
):
self
.
job_server
=
None
self
.
pods
=
[]
self
.
hdfs
=
None
self
.
job_stage_flag
=
None
def
__str__
(
self
):
return
"job_server:{} pods:{} job_stage_flag:{} hdfs:{}"
.
format
(
self
.
job_server
,
[
str
(
pod
)
for
pod
in
self
.
pods
],
self
.
job_stage_flag
,
self
.
hdfs
)
def
__eq__
(
self
,
cluster
):
if
len
(
self
.
pods
)
!=
len
(
cluster
.
pods
):
return
False
for
a
,
b
in
zip
(
self
.
pods
,
cluster
.
pods
):
if
a
!=
b
:
return
False
if
self
.
job_stage_flag
!=
cluster
.
job_stage_flag
:
return
False
return
True
def
__ne__
(
self
,
cluster
):
return
not
self
.
__eq__
(
cluster
)
def
update_pods
(
cluster
):
self
.
pods
=
copy
.
copy
(
cluster
.
pods
)
def
trainers_nranks
(
self
):
return
len
(
self
.
trainers_endpoints
())
def
pods_nranks
(
self
):
return
len
(
self
.
pods
)
def
trainers_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
for
t
in
pod
.
trainers
:
r
.
append
(
t
.
endpoint
)
return
r
def
pods_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
ep
=
"{}:{}"
.
format
(
pod
.
addr
,
pod
.
port
)
assert
pod
.
port
!=
None
and
pod
.
addr
!=
None
,
"{} not a valid endpoint"
.
format
(
ep
)
r
.
append
(
ep
)
return
r
def
get_pod_by_id
(
self
,
pod_id
):
for
pod
in
self
.
pods
:
if
str
(
pod_id
)
==
str
(
pod
.
id
):
return
pod
return
None
class
JobServer
(
object
):
def
__init__
(
self
):
self
.
endpoint
=
None
def
__str__
(
self
):
return
"{}"
.
format
(
self
.
endpoint
)
def
__eq__
(
self
,
j
):
return
self
.
endpint
==
j
.
endpoint
def
__ne__
(
self
,
j
):
return
not
self
==
j
class
Trainer
(
object
):
def
__init__
(
self
):
self
.
gpus
=
[]
self
.
endpoint
=
None
self
.
rank
=
None
def
__str__
(
self
):
return
"gpu:{} endpoint:{} rank:{}"
.
format
(
self
.
gpus
,
self
.
endpoint
,
self
.
rank
)
def
__eq__
(
self
,
t
):
if
len
(
self
.
gpus
)
!=
len
(
t
.
gpus
):
return
False
if
self
.
endpoint
!=
t
.
endpoint
or
\
self
.
rank
!=
t
.
rank
:
return
False
for
a
,
b
in
zip
(
self
.
gpus
,
t
.
gpus
):
if
a
!=
b
:
return
False
return
True
def
__ne__
(
self
,
t
):
return
not
self
==
t
def
rank
(
self
):
return
self
.
rank
class
Pod
(
object
):
def
__init__
(
self
):
self
.
rank
=
None
self
.
id
=
None
self
.
addr
=
None
self
.
port
=
None
self
.
trainers
=
[]
self
.
gpus
=
[]
def
__str__
(
self
):
return
"rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}"
.
format
(
self
.
rank
,
self
.
id
,
self
.
addr
,
self
.
port
,
self
.
gpus
,
[
str
(
t
)
for
t
in
self
.
trainers
])
def
__eq__
(
self
,
pod
):
if
self
.
rank
!=
pod
.
rank
or
\
self
.
id
!=
pod
.
id
or
\
self
.
addr
!=
pod
.
addr
or
\
self
.
port
!=
pod
.
port
:
logger
.
debug
(
"pod {} != pod"
.
format
(
self
,
pod
))
return
False
if
len
(
self
.
trainers
)
!=
len
(
pod
.
trainers
):
logger
.
debug
(
"trainers {} != {}"
.
format
(
self
.
trainers
,
pod
.
trainers
))
return
False
for
i
in
range
(
len
(
self
.
trainers
)):
if
self
.
trainers
[
i
]
!=
pod
.
trainers
[
i
]:
logger
.
debug
(
"trainer {} != {}"
.
format
(
self
.
trainers
[
i
],
pod
.
trainers
[
i
]))
return
False
return
True
def
__ne__
(
self
,
pod
):
return
not
self
==
pod
def
parse_response
(
self
,
res_pods
):
pass
def
rank
(
self
):
return
self
.
rank
def
get_visible_gpus
(
self
):
r
=
""
for
g
in
self
.
gpus
:
r
+=
"{},"
.
format
(
g
)
assert
r
!=
""
,
"this pod {} can't see any gpus"
.
format
(
self
)
r
=
r
[:
-
1
]
return
r
def
get_cluster
(
node_ips
,
node_ip
,
paddle_ports
,
selected_gpus
):
assert
type
(
paddle_ports
)
is
list
,
"paddle_ports must be list"
cluster
=
Cluster
(
hdfs
=
None
)
trainer_rank
=
0
for
node_rank
,
ip
in
enumerate
(
node_ips
):
pod
=
Pod
()
pod
.
rank
=
node_rank
pod
.
addr
=
ip
for
i
in
range
(
len
(
selected_gpus
)):
trainer
=
Trainer
()
trainer
.
gpus
.
append
(
selected_gpus
[
i
])
trainer
.
endpoint
=
"%s:%d"
%
(
ip
,
paddle_ports
[
i
])
trainer
.
rank
=
trainer_rank
trainer_rank
+=
1
pod
.
trainers
.
append
(
trainer
)
cluster
.
pods
.
append
(
pod
)
pod_rank
=
node_ips
.
index
(
node_ip
)
return
cluster
,
cluster
.
pods
[
pod_rank
]
def
get_cloud_cluster
(
selected_gpus
,
args_port
=
None
):
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
)
assert
node_ips
is
not
None
,
"PADDLE_TRAINERS should not be None"
print
(
"node_ips:{}"
.
format
(
node_ips
))
node_ip
=
os
.
getenv
(
"POD_IP"
)
assert
node_ip
is
not
None
,
"POD_IP should not be None"
print
(
"node_ip:{}"
.
format
(
node_ip
))
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
)
assert
node_rank
is
not
None
,
"PADDLE_TRAINER_ID should not be None"
print
(
"node_rank:{}"
.
format
(
node_rank
))
node_ips
=
node_ips
.
split
(
","
)
num_nodes
=
len
(
node_ips
)
node_rank
=
int
(
node_rank
)
started_port
=
args_port
print
(
"num_nodes:"
,
num_nodes
)
if
num_nodes
>
1
:
try
:
paddle_port
=
int
(
os
.
getenv
(
"PADDLE_PORT"
,
""
))
paddle_port_num
=
int
(
os
.
getenv
(
"TRAINER_PORTS_NUM"
,
""
))
if
paddle_port_num
>=
len
(
selected_gpus
)
and
paddle_port
!=
args_port
:
logger
.
warning
(
"Use Cloud specified port:{}."
.
format
(
paddle_port
))
started_port
=
paddle_port
except
Exception
as
e
:
print
(
e
)
pass
if
started_port
is
None
:
started_port
=
6170
logger
.
debug
(
"parsed from args:node_ips:{}
\
node_ip:{} node_rank:{} started_port:{}"
.
format
(
node_ips
,
node_ip
,
node_rank
,
started_port
))
ports
=
[
x
for
x
in
range
(
started_port
,
started_port
+
len
(
selected_gpus
))]
cluster
,
pod
=
get_cluster
(
node_ips
,
node_ip
,
ports
,
selected_gpus
)
return
cluster
,
cluster
.
pods
[
node_rank
]
def
use_paddlecloud
():
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
,
None
)
node_ip
=
os
.
getenv
(
"POD_IP"
,
None
)
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
,
None
)
if
node_ips
is
None
or
node_ip
is
None
or
node_rank
is
None
:
return
False
else
:
return
True
class
TrainerProc
(
object
):
def
__init__
(
self
):
self
.
proc
=
None
self
.
log_fn
=
None
self
.
log_offset
=
None
self
.
rank
=
None
self
.
local_rank
=
None
self
.
cmd
=
None
def
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
None
):
current_env
=
copy
.
copy
(
os
.
environ
.
copy
())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env
.
pop
(
"http_proxy"
,
None
)
current_env
.
pop
(
"https_proxy"
,
None
)
procs
=
[]
for
idx
,
t
in
enumerate
(
pod
.
trainers
):
proc_env
=
{
"FLAGS_selected_gpus"
:
"%s"
%
","
.
join
([
str
(
g
)
for
g
in
t
.
gpus
]),
"PADDLE_TRAINER_ID"
:
"%d"
%
t
.
rank
,
"PADDLE_CURRENT_ENDPOINT"
:
"%s"
%
t
.
endpoint
,
"PADDLE_TRAINERS_NUM"
:
"%d"
%
cluster
.
trainers_nranks
(),
"PADDLE_TRAINER_ENDPOINTS"
:
","
.
join
(
cluster
.
trainers_endpoints
())
}
current_env
.
update
(
proc_env
)
logger
.
debug
(
"trainer proc env:{}"
.
format
(
current_env
))
# cmd = [sys.executable, "-u", training_script]
logger
.
info
(
"start trainer proc:{} env:{}"
.
format
(
cmd
,
proc_env
))
fn
=
None
if
log_dir
is
not
None
:
os
.
system
(
"mkdir -p {}"
.
format
(
log_dir
))
fn
=
open
(
"%s/workerlog.%d"
%
(
log_dir
,
idx
),
"a"
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
)
else
:
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
)
tp
=
TrainerProc
()
tp
.
proc
=
proc
tp
.
rank
=
t
.
rank
tp
.
local_rank
=
idx
tp
.
log_fn
=
fn
tp
.
log_offset
=
fn
.
tell
()
if
fn
else
None
tp
.
cmd
=
cmd
procs
.
append
(
proc
)
return
procs
core/engine/local_cluster.py
浏览文件 @
0a4ba25e
...
@@ -19,9 +19,14 @@ import copy
...
@@ -19,9 +19,14 @@ import copy
import
os
import
os
import
sys
import
sys
import
subprocess
import
subprocess
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.utils
import
envs
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
LocalClusterEngine
(
Engine
):
class
LocalClusterEngine
(
Engine
):
...
@@ -97,42 +102,70 @@ class LocalClusterEngine(Engine):
...
@@ -97,42 +102,70 @@ class LocalClusterEngine(Engine):
stderr
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
procs
.
append
(
proc
)
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
selected_gpus
=
self
.
envs
[
"selected_gpus"
].
split
(
","
)
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
[
x
.
strip
()
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
.
strip
())
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
selected_gpus_num
=
len
(
selected_gpus
)
selected_gpus_num
=
len
(
selected_gpus
)
for
i
in
range
(
selected_gpus_num
-
1
):
while
True
:
new_port
=
envs
.
find_free_port
()
if
new_port
not
in
ports
:
ports
.
append
(
new_port
)
break
user_endpoints
=
","
.
join
([
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
factory
=
"paddlerec.core.factory"
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
for
i
in
range
(
selected_gpus_num
):
print
(
"use_paddlecloud_flag:{}"
.
format
(
current_env
.
update
({
cluster_utils
.
use_paddlecloud
()))
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
if
cluster_utils
.
use_paddlecloud
():
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
"TRAINING_ROLE"
:
"TRAINER"
,
procs
=
cluster_utils
.
start_local_trainers
(
"PADDLE_TRAINER_ID"
:
str
(
i
),
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
else
:
})
# trainers_num = 1 or not use paddlecloud ips="a,b"
for
i
in
range
(
selected_gpus_num
-
1
):
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
while
True
:
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
new_port
=
envs
.
find_free_port
()
log_fns
.
append
(
fn
)
if
new_port
not
in
ports
:
proc
=
subprocess
.
Popen
(
ports
.
append
(
new_port
)
cmd
,
break
env
=
current_env
,
user_endpoints
=
","
.
join
(
stdout
=
fn
,
[
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
stderr
=
fn
,
for
i
in
range
(
selected_gpus_num
):
cwd
=
os
.
getcwd
())
current_env
.
update
({
procs
.
append
(
proc
)
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
"TRAINING_ROLE"
:
"TRAINER"
,
"PADDLE_TRAINER_ID"
:
str
(
i
),
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
})
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
log_fns
.
append
(
fn
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
# only wait worker to finish here
# only wait worker to finish here
for
i
,
proc
in
enumerate
(
procs
):
for
i
,
proc
in
enumerate
(
procs
):
...
...
models/multitask/mmoe/config.yaml
浏览文件 @
0a4ba25e
...
@@ -49,10 +49,12 @@ runner:
...
@@ -49,10 +49,12 @@ runner:
save_checkpoint_path
:
"
increment"
save_checkpoint_path
:
"
increment"
save_inference_path
:
"
inference"
save_inference_path
:
"
inference"
print_interval
:
1
print_interval
:
1
phases
:
[
train
]
-
name
:
infer_runner
-
name
:
infer_runner
class
:
infer
class
:
infer
init_model_path
:
"
increment/1"
init_model_path
:
"
increment/1"
device
:
cpu
device
:
cpu
phases
:
[
infer
]
phase
:
phase
:
-
name
:
train
-
name
:
train
...
...
models/rank/dnn/README.md
浏览文件 @
0a4ba25e
...
@@ -30,13 +30,12 @@
...
@@ -30,13 +30,12 @@
### 一键下载训练及测试数据
### 一键下载训练及测试数据
```
bash
```
bash
sh
download_data
.sh
sh
run
.sh
```
```
执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于
`./train_data_full/`
,全量测试数据放置于
`./test_data_full/`
,用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./
test_data/`
。
进入models/rank/dnn/data目录下,执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。原始的全量数据放置于
`./train_data_full/`
,原始的全量测试数据放置于
`./test_data_full/`
,原始的用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./test_data/`
。处理后的全量训练数据放置于
`./slot_train_data_full/`
,处理后的全量测试数据放置于
`./slot_test_data_full/`
,处理后的用于快速验证的训练数据与测试数据放置于
`./slot_train_data/`
与
`./slot_
test_data/`
。
执行该脚本的理想输出为:
执行该脚本的理想输出为:
```
bash
```
bash
>
sh download_data.sh
--2019-11-26
06:31:33-- https://fleet.bj.bcebos.com/ctr_data.tar.gz
--2019-11-26
06:31:33-- https://fleet.bj.bcebos.com/ctr_data.tar.gz
Resolving fleet.bj.bcebos.com... 10.180.112.31
Resolving fleet.bj.bcebos.com... 10.180.112.31
Connecting to fleet.bj.bcebos.com|10.180.112.31|:443... connected.
Connecting to fleet.bj.bcebos.com|10.180.112.31|:443... connected.
...
@@ -100,7 +99,7 @@ def get_dataset(inputs, args)
...
@@ -100,7 +99,7 @@ def get_dataset(inputs, args)
3.
创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用
`MultiSlotDataGenerator`
;若已经完成了预处理并保存为数据文件,可以直接以
`string`
的方式进行读取,使用
`MultiSlotStringDataGenerator`
,能够进一步加速。在示例代码,我们继承并实现了名为
`CriteoDataset`
的dataset子类,使用
`MultiSlotDataGenerator`
方法。
3.
创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用
`MultiSlotDataGenerator`
;若已经完成了预处理并保存为数据文件,可以直接以
`string`
的方式进行读取,使用
`MultiSlotStringDataGenerator`
,能够进一步加速。在示例代码,我们继承并实现了名为
`CriteoDataset`
的dataset子类,使用
`MultiSlotDataGenerator`
方法。
4.
继承并实现基类中的
`generate_sample`
函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
4.
继承并实现基类中的
`generate_sample`
函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
5.
在这个可以迭代的函数中,如示例代码中的
`def reader()`
,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
5.
在这个可以迭代的函数中,如示例代码中的
`def reader()`
,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
,并转换为类似字典的形式。在示例代码中,我们使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如
`[('dense_feature',[value]),('C1',[value]),('C2',[value]),...,('C26',[value]),('label',[value])]`
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
。在示例代码中,我们将数据整理成
`click:value dense_feature:value ... dense_feature:value 1:value ... 26:value`
的格式。用print输出是因为我们在run.sh中将结果重定向到slot_train_data等文件中,由模型直接读取。在用户自定义使用时,可以使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出,并在config.yaml中的data_converter参数指定reader的路径。
```
python
```
python
...
@@ -113,11 +112,22 @@ hash_dim_ = 1000001
...
@@ -113,11 +112,22 @@ hash_dim_ = 1000001
continuous_range_
=
range
(
1
,
14
)
continuous_range_
=
range
(
1
,
14
)
categorical_range_
=
range
(
14
,
40
)
categorical_range_
=
range
(
14
,
40
)
class
CriteoDataset
(
dg
.
MultiSlotDataGenerator
):
class
CriteoDataset
(
dg
.
MultiSlotDataGenerator
):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def
generate_sample
(
self
,
line
):
def
generate_sample
(
self
,
line
):
"""
Read the data line by line and process it as a dictionary
"""
def
reader
():
def
reader
():
"""
This function needs to be implemented by the user, based on data format
"""
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
dense_feature
=
[]
dense_feature
=
[]
sparse_feature
=
[]
sparse_feature
=
[]
...
@@ -137,11 +147,16 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
...
@@ -137,11 +147,16 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
for
idx
in
categorical_range_
:
for
idx
in
categorical_range_
:
feature_name
.
append
(
"C"
+
str
(
idx
-
13
))
feature_name
.
append
(
"C"
+
str
(
idx
-
13
))
feature_name
.
append
(
"label"
)
feature_name
.
append
(
"label"
)
s
=
"click:"
+
str
(
label
[
0
])
yield
zip
(
feature_name
,
[
dense_feature
]
+
sparse_feature
+
[
label
])
for
i
in
dense_feature
:
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
# add print for data preprocessing
return
reader
return
reader
d
=
CriteoDataset
()
d
=
CriteoDataset
()
d
.
run_from_stdin
()
d
.
run_from_stdin
()
```
```
...
@@ -149,117 +164,124 @@ d.run_from_stdin()
...
@@ -149,117 +164,124 @@ d.run_from_stdin()
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
`cat 数据文件 | python dataset读取python文件`
进行dataset代码的调试:
`cat 数据文件 | python dataset读取python文件`
进行dataset代码的调试:
```
bash
```
bash
cat
train_data/part-0 | python
dataset_generator
.py
cat
train_data/part-0 | python
get_slot_data
.py
```
```
输出的数据格式如下:
输出的数据格式如下:
`
dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; ... ; sparse_input:size ; sparse_input:value ; label:size ; label
:value `
`
label:value dense_input:value ... dense_input:value sparse_input:value ... sparse_input
:value `
理想的输出为(截取了一个片段):
理想的输出为(截取了一个片段):
```
bash
```
bash
...
...
13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0
click:0 dense_feature:0.05 dense_feature:0.00663349917081 dense_feature:0.05 dense_feature:0.0 dense_feature:0.02159375 dense_feature:0.008 dense_feature:0.15 dense_feature:0.04 dense_feature:0.362 dense_feature:0.1 dense_feature:0.2 dense_feature:0.0 dense_feature:0.04 1:715353 2:817085 3:851010 4:833725 5:286835 6:948614 7:881652 8:507110 9:27346 10:646986 11:643076 12:200960 13:18464 14:202774 15:532679 16:729573 17:342789 18:562805 19:880474 20:984402 21:666449 22:26235 23:700326 24:452909 25:884722 26:787527
...
...
```
```
#
#
## 模型组网
## 模型组网
### 数据输入声明
### 数据输入声明
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_feature_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_input_ids`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`C1~C26`
的26个稀疏参数输入,并设置
`lod_level=1`
,代表其为变长数据,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_input_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_inputs`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`1~26`
的26个稀疏参数输入,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
在Paddle中数据输入的声明使用
`paddle.fluid.data()`
,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。
```
python
dense_input
=
fluid
.
data
(
name
=
"dense_input"
,
shape
=
[
-
1
,
args
.
dense_feature_dim
],
dtype
=
"float32"
)
sparse_input_ids
=
[
fluid
.
data
(
name
=
"C"
+
str
(
i
),
shape
=
[
-
1
,
1
],
lod_level
=
1
,
dtype
=
"int64"
)
for
i
in
range
(
1
,
27
)
]
label
=
fluid
.
data
(
name
=
"label"
,
shape
=
[
-
1
,
1
],
dtype
=
"int64"
)
inputs
=
[
dense_input
]
+
sparse_input_ids
+
[
label
]
```
### CTR-DNN模型组网
### CTR-DNN模型组网
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
三
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
四
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
#### Embedding层
#### Embedding层
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
shape由超参的
`sparse_feature_dim`
和
`embedding_siz
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
由超参的
`sparse_feature_number`
和
`sparse_feature_dimshap
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
各个稀疏的输入通过Embedding层后,将其合并起来,置于一个list内,以方便进行concat的操作。
各个稀疏的输入通过Embedding层后,将其合并起来,置于一个list内,以方便进行concat的操作。
```
python
```
python
def
embedding_layer
(
input
):
def
embedding_layer
(
input
):
return
fluid
.
layers
.
embedding
(
if
self
.
distributed_embedding
:
emb
=
fluid
.
contrib
.
layers
.
sparse_embedding
(
input
=
input
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
else
:
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
input
=
input
,
is_sparse
=
True
,
is_sparse
=
True
,
size
=
[
args
.
sparse_feature_dim
,
is_distributed
=
self
.
is_distributed
,
args
.
embedding_size
],
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()),
initializer
=
fluid
.
initializer
.
Uniform
()))
)
emb_sum
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'sum'
)
return
emb_sum
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
inputs
[
1
:
-
1
]
))
# [C1~C26]
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
self
.
sparse_inputs
))
# [C1~C26]
```
```
#### FC层
#### FC层
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
3层FC,每层FC的输出维度都为400
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
4层FC,每层FC的输出维度由超参
`fc_sizes`
指定
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
```
python
```
python
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
inputs
[
0
:
1
],
axis
=
1
)
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
[
self
.
dense_input
],
axis
=
1
)
fc1
=
fluid
.
layers
.
fc
(
input
=
concated
,
fcs
=
[
concated
]
size
=
400
,
hidden_layers
=
envs
.
get_global_env
(
"hyper_parameters.fc_sizes"
)
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
for
size
in
hidden_layers
:
scale
=
1
/
math
.
sqrt
(
concated
.
shape
[
1
]))),
output
=
fluid
.
layers
.
fc
(
)
input
=
fcs
[
-
1
],
fc2
=
fluid
.
layers
.
fc
(
size
=
size
,
input
=
fc1
,
act
=
'relu'
,
size
=
400
,
param_attr
=
fluid
.
ParamAttr
(
act
=
"relu"
,
initializer
=
fluid
.
initializer
.
Normal
(
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1.0
/
math
.
sqrt
(
fcs
[
-
1
].
shape
[
1
]))))
scale
=
1
/
math
.
sqrt
(
fc1
.
shape
[
1
]))),
fcs
.
append
(
output
)
)
fc3
=
fluid
.
layers
.
fc
(
input
=
fc2
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc2
.
shape
[
1
]))),
)
```
```
#### Loss及Auc计算
#### Loss及Auc计算
-
预测的结果通过一个输出shape为2的FC层给出,该FC层的激活函数是softmax,会给出每条样本分属于正负样本的概率。
-
预测的结果通过一个输出shape为2的FC层给出,该FC层的激活函数是softmax,会给出每条样本分属于正负样本的概率。
-
每条样本的损失由交叉熵给出,交叉熵的输入维度为[batch_size,2],数据类型为float,label的输入维度为[batch_size,1],数据类型为int。
-
每条样本的损失由交叉熵给出,交叉熵的输入维度为[batch_size,2],数据类型为float,label的输入维度为[batch_size,1],数据类型为int。
-
该batch的损失
`avg_cost`
是各条样本的损失之和
-
该batch的损失
`avg_cost`
是各条样本的损失之和
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
全局auc:
`auc_var`
,当前batch的auc:
`batch_auc_var`
,以及auc_states:
`auc_states
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
从第一个batch累计到当前batch的全局auc:
`auc`
,最近几个batch的auc:
`batch_auc`
,以及auc_states:
`_
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
```
```
predict = fluid.layers.fc(
predict = fluid.layers.fc(
input=fc3,
input=fcs[-1],
size=2,
size=2,
act="softmax",
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
scale=1 / math.sqrt(fcs[-1].shape[1]))))
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=inputs[-1])
auc_var, batch_auc_var, auc_states = fluid.layers.auc(
input=predict,
label=inputs[-1],
num_thresholds=2**12,
slide_steps=20)
```
完成上述组网后,我们最终可以通过训练拿到
`avg_cost`
与
`auc`
两个重要指标。
self.predict = predict
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,label=self.label_input,
num_thresholds=2**12,
slide_steps=20)
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
```
完成上述组网后,我们最终可以通过训练拿到
`BATCH_AUC`
与
`auc`
两个重要指标。
```
PaddleRec: Runner single_cpu_infer Begin
Executor Mode: infer
processor_register begin
Running SingleInstance.
Running SingleNetwork.
Running SingleInferStartup.
Running SingleInferRunner.
load persistables from increment_dnn/3
batch: 20, BATCH_AUC: [0.75670043], AUC: [0.77490453]
batch: 40, BATCH_AUC: [0.77020144], AUC: [0.77490437]
batch: 60, BATCH_AUC: [0.77464683], AUC: [0.77490435]
batch: 80, BATCH_AUC: [0.76858989], AUC: [0.77490416]
batch: 100, BATCH_AUC: [0.75728286], AUC: [0.77490362]
batch: 120, BATCH_AUC: [0.75007016], AUC: [0.77490286]
...
batch: 720, BATCH_AUC: [0.76840144], AUC: [0.77489881]
batch: 740, BATCH_AUC: [0.76659033], AUC: [0.77489854]
batch: 760, BATCH_AUC: [0.77332639], AUC: [0.77489849]
batch: 780, BATCH_AUC: [0.78361653], AUC: [0.77489874]
Infer phase2 of epoch increment_dnn/3 done, use time: 52.7707588673, global metrics: BATCH_AUC=[0.78361653], AUC=[0.77489874]
PaddleRec Finish
```
## 流式训练(OnlineLearning)任务启动及配置流程
## 流式训练(OnlineLearning)任务启动及配置流程
...
@@ -387,5 +409,5 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
...
@@ -387,5 +409,5 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
```
```
4.
准备好数据后, 即可按照标准的训练流程进行流式训练了
4.
准备好数据后, 即可按照标准的训练流程进行流式训练了
```
shell
```
shell
python
-m
paddlerec.run
-m
models/r
erank/ctr-
dnn/config.yaml
python
-m
paddlerec.run
-m
models/r
ank/
dnn/config.yaml
```
```
models/rank/dnn/data/get_slot_data.py
浏览文件 @
0a4ba25e
...
@@ -61,8 +61,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
...
@@ -61,8 +61,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
s
+=
" dense_feature:"
+
str
(
i
)
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
print
(
s
.
strip
())
# add print for data preprocessing
yield
None
return
reader
return
reader
...
...
models/recall/word2vec/README.md
浏览文件 @
0a4ba25e
...
@@ -222,15 +222,18 @@ Infer phase2 of epoch 3 done, use time: 4.43099021912, global metrics: acc=[1.]
...
@@ -222,15 +222,18 @@ Infer phase2 of epoch 3 done, use time: 4.43099021912, global metrics: acc=[1.]
## 论文复现
## 论文复现
1.
用原论文的完整数据复现论文效果需要在config.yaml修改超参:
1.
用原论文的完整数据复现论文效果需要在config.yaml修改超参:
```
- name: dataset_train
- name: dataset_train
batch_size: 100 # 1. 修改batch_size为100
batch_size: 100 # 1. 修改batch_size为100
type: DataLoader
type: DataLoader
data_path: "{workspace}/data/all_train" # 2. 修改数据为全量训练数据
data_path: "{workspace}/data/all_train" # 2. 修改数据为全量训练数据
word_count_dict_path: "{workspace}/data/all_dict/
word_count_dict.txt" # 3. 修改词表为全量词表
word_count_dict_path: "{workspace}/data/all_dict/word_count_dict.txt" # 3. 修改词表为全量词表
data_converter: "{workspace}/w2v_reader.py"
data_converter: "{workspace}/w2v_reader.py"
- name: dataset_infer
data_path: "{workspace}/data/all_test" # 4. 修改数据为全量测试数据
word_id_dict_path: "{workspace}/data/all_dict/word_id_dict.txt" # 5. 修改词表为全量词表
-
name: single_cpu_train
```
-
epochs: # 4. 修改config.yaml中runner的epochs为5。
修改后运行方案:修改config.yaml中的'workspace'为config.yaml的目录位置,执行
修改后运行方案:修改config.yaml中的'workspace'为config.yaml的目录位置,执行
```
```
...
...
models/recall/youtube_dnn/README.md
浏览文件 @
0a4ba25e
...
@@ -8,7 +8,7 @@
...
@@ -8,7 +8,7 @@
├── data.txt
├── data.txt
├── test
├── test
├── data.txt
├── data.txt
├── generate_ramdom_data # 随机训练数据生成文件
├── generate_ramdom_data
.py
# 随机训练数据生成文件
├── __init__.py
├── __init__.py
├── README.md # 文档
├── README.md # 文档
├── model.py #模型文件
├── model.py #模型文件
...
@@ -107,7 +107,7 @@ python infer.py --use_gpu 1 --test_epoch 19 --inference_model_dir ./inference_yo
...
@@ -107,7 +107,7 @@ python infer.py --use_gpu 1 --test_epoch 19 --inference_model_dir ./inference_yo
```
```
### 运行
### 运行
```
```
python -m paddlerec.run -m paddlerec.models.recall.
w2v
python -m paddlerec.run -m paddlerec.models.recall.
youtube_dnn
```
```
### 结果展示
### 结果展示
...
...
run.py
浏览文件 @
0a4ba25e
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"log_dir"
]
=
"logs"
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录