Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
PaddleRec
提交
1a044578
P
PaddleRec
项目概览
BaiXuePrincess
/
PaddleRec
与 Fork 源项目一致
Fork自
PaddlePaddle / PaddleRec
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1a044578
编写于
9月 28, 2020
作者:
Y
yinhaofeng
提交者:
GitHub
9月 28, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' into wdeep
上级
3c64fd3d
7cfe354b
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
541 addition
and
121 deletion
+541
-121
core/engine/cluster/cluster.py
core/engine/cluster/cluster.py
+38
-2
core/engine/cluster_utils.py
core/engine/cluster_utils.py
+324
-0
core/engine/local_cluster.py
core/engine/local_cluster.py
+63
-30
models/multitask/mmoe/config.yaml
models/multitask/mmoe/config.yaml
+2
-0
models/rank/dnn/README.md
models/rank/dnn/README.md
+103
-81
models/rank/dnn/data/get_slot_data.py
models/rank/dnn/data/get_slot_data.py
+1
-2
models/recall/word2vec/README.md
models/recall/word2vec/README.md
+7
-4
models/recall/youtube_dnn/README.md
models/recall/youtube_dnn/README.md
+2
-2
run.py
run.py
+1
-0
未找到文件。
core/engine/cluster/cluster.py
浏览文件 @
1a044578
...
...
@@ -19,10 +19,16 @@ import copy
import
os
import
subprocess
import
warnings
import
sys
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
ClusterEngine
(
Engine
):
...
...
@@ -47,6 +53,36 @@ class ClusterEngine(Engine):
self
.
backend
))
def
start_worker_procs
(
self
):
if
(
envs
.
get_runtime_environ
(
"fleet_mode"
)
==
"COLLECTIVE"
):
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
)))
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
))):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
)]
print
(
"selected_gpus:{}"
.
format
(
selected_gpus
))
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
logs_dir
=
envs
.
get_runtime_environ
(
"log_dir"
)
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
print
(
"cluster:{}"
.
format
(
cluster
))
print
(
"pod:{}"
.
format
(
pod
))
else
:
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
trainer
.
run
()
...
...
core/engine/cluster_utils.py
0 → 100644
浏览文件 @
1a044578
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
functools
import
logging
import
socket
import
time
import
os
import
signal
import
copy
import
sys
import
subprocess
from
contextlib
import
closing
import
socket
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
Cluster
(
object
):
def
__init__
(
self
,
hdfs
):
self
.
job_server
=
None
self
.
pods
=
[]
self
.
hdfs
=
None
self
.
job_stage_flag
=
None
def
__str__
(
self
):
return
"job_server:{} pods:{} job_stage_flag:{} hdfs:{}"
.
format
(
self
.
job_server
,
[
str
(
pod
)
for
pod
in
self
.
pods
],
self
.
job_stage_flag
,
self
.
hdfs
)
def
__eq__
(
self
,
cluster
):
if
len
(
self
.
pods
)
!=
len
(
cluster
.
pods
):
return
False
for
a
,
b
in
zip
(
self
.
pods
,
cluster
.
pods
):
if
a
!=
b
:
return
False
if
self
.
job_stage_flag
!=
cluster
.
job_stage_flag
:
return
False
return
True
def
__ne__
(
self
,
cluster
):
return
not
self
.
__eq__
(
cluster
)
def
update_pods
(
cluster
):
self
.
pods
=
copy
.
copy
(
cluster
.
pods
)
def
trainers_nranks
(
self
):
return
len
(
self
.
trainers_endpoints
())
def
pods_nranks
(
self
):
return
len
(
self
.
pods
)
def
trainers_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
for
t
in
pod
.
trainers
:
r
.
append
(
t
.
endpoint
)
return
r
def
pods_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
ep
=
"{}:{}"
.
format
(
pod
.
addr
,
pod
.
port
)
assert
pod
.
port
!=
None
and
pod
.
addr
!=
None
,
"{} not a valid endpoint"
.
format
(
ep
)
r
.
append
(
ep
)
return
r
def
get_pod_by_id
(
self
,
pod_id
):
for
pod
in
self
.
pods
:
if
str
(
pod_id
)
==
str
(
pod
.
id
):
return
pod
return
None
class
JobServer
(
object
):
def
__init__
(
self
):
self
.
endpoint
=
None
def
__str__
(
self
):
return
"{}"
.
format
(
self
.
endpoint
)
def
__eq__
(
self
,
j
):
return
self
.
endpint
==
j
.
endpoint
def
__ne__
(
self
,
j
):
return
not
self
==
j
class
Trainer
(
object
):
def
__init__
(
self
):
self
.
gpus
=
[]
self
.
endpoint
=
None
self
.
rank
=
None
def
__str__
(
self
):
return
"gpu:{} endpoint:{} rank:{}"
.
format
(
self
.
gpus
,
self
.
endpoint
,
self
.
rank
)
def
__eq__
(
self
,
t
):
if
len
(
self
.
gpus
)
!=
len
(
t
.
gpus
):
return
False
if
self
.
endpoint
!=
t
.
endpoint
or
\
self
.
rank
!=
t
.
rank
:
return
False
for
a
,
b
in
zip
(
self
.
gpus
,
t
.
gpus
):
if
a
!=
b
:
return
False
return
True
def
__ne__
(
self
,
t
):
return
not
self
==
t
def
rank
(
self
):
return
self
.
rank
class
Pod
(
object
):
def
__init__
(
self
):
self
.
rank
=
None
self
.
id
=
None
self
.
addr
=
None
self
.
port
=
None
self
.
trainers
=
[]
self
.
gpus
=
[]
def
__str__
(
self
):
return
"rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}"
.
format
(
self
.
rank
,
self
.
id
,
self
.
addr
,
self
.
port
,
self
.
gpus
,
[
str
(
t
)
for
t
in
self
.
trainers
])
def
__eq__
(
self
,
pod
):
if
self
.
rank
!=
pod
.
rank
or
\
self
.
id
!=
pod
.
id
or
\
self
.
addr
!=
pod
.
addr
or
\
self
.
port
!=
pod
.
port
:
logger
.
debug
(
"pod {} != pod"
.
format
(
self
,
pod
))
return
False
if
len
(
self
.
trainers
)
!=
len
(
pod
.
trainers
):
logger
.
debug
(
"trainers {} != {}"
.
format
(
self
.
trainers
,
pod
.
trainers
))
return
False
for
i
in
range
(
len
(
self
.
trainers
)):
if
self
.
trainers
[
i
]
!=
pod
.
trainers
[
i
]:
logger
.
debug
(
"trainer {} != {}"
.
format
(
self
.
trainers
[
i
],
pod
.
trainers
[
i
]))
return
False
return
True
def
__ne__
(
self
,
pod
):
return
not
self
==
pod
def
parse_response
(
self
,
res_pods
):
pass
def
rank
(
self
):
return
self
.
rank
def
get_visible_gpus
(
self
):
r
=
""
for
g
in
self
.
gpus
:
r
+=
"{},"
.
format
(
g
)
assert
r
!=
""
,
"this pod {} can't see any gpus"
.
format
(
self
)
r
=
r
[:
-
1
]
return
r
def
get_cluster
(
node_ips
,
node_ip
,
paddle_ports
,
selected_gpus
):
assert
type
(
paddle_ports
)
is
list
,
"paddle_ports must be list"
cluster
=
Cluster
(
hdfs
=
None
)
trainer_rank
=
0
for
node_rank
,
ip
in
enumerate
(
node_ips
):
pod
=
Pod
()
pod
.
rank
=
node_rank
pod
.
addr
=
ip
for
i
in
range
(
len
(
selected_gpus
)):
trainer
=
Trainer
()
trainer
.
gpus
.
append
(
selected_gpus
[
i
])
trainer
.
endpoint
=
"%s:%d"
%
(
ip
,
paddle_ports
[
i
])
trainer
.
rank
=
trainer_rank
trainer_rank
+=
1
pod
.
trainers
.
append
(
trainer
)
cluster
.
pods
.
append
(
pod
)
pod_rank
=
node_ips
.
index
(
node_ip
)
return
cluster
,
cluster
.
pods
[
pod_rank
]
def
get_cloud_cluster
(
selected_gpus
,
args_port
=
None
):
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
)
assert
node_ips
is
not
None
,
"PADDLE_TRAINERS should not be None"
print
(
"node_ips:{}"
.
format
(
node_ips
))
node_ip
=
os
.
getenv
(
"POD_IP"
)
assert
node_ip
is
not
None
,
"POD_IP should not be None"
print
(
"node_ip:{}"
.
format
(
node_ip
))
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
)
assert
node_rank
is
not
None
,
"PADDLE_TRAINER_ID should not be None"
print
(
"node_rank:{}"
.
format
(
node_rank
))
node_ips
=
node_ips
.
split
(
","
)
num_nodes
=
len
(
node_ips
)
node_rank
=
int
(
node_rank
)
started_port
=
args_port
print
(
"num_nodes:"
,
num_nodes
)
if
num_nodes
>
1
:
try
:
paddle_port
=
int
(
os
.
getenv
(
"PADDLE_PORT"
,
""
))
paddle_port_num
=
int
(
os
.
getenv
(
"TRAINER_PORTS_NUM"
,
""
))
if
paddle_port_num
>=
len
(
selected_gpus
)
and
paddle_port
!=
args_port
:
logger
.
warning
(
"Use Cloud specified port:{}."
.
format
(
paddle_port
))
started_port
=
paddle_port
except
Exception
as
e
:
print
(
e
)
pass
if
started_port
is
None
:
started_port
=
6170
logger
.
debug
(
"parsed from args:node_ips:{}
\
node_ip:{} node_rank:{} started_port:{}"
.
format
(
node_ips
,
node_ip
,
node_rank
,
started_port
))
ports
=
[
x
for
x
in
range
(
started_port
,
started_port
+
len
(
selected_gpus
))]
cluster
,
pod
=
get_cluster
(
node_ips
,
node_ip
,
ports
,
selected_gpus
)
return
cluster
,
cluster
.
pods
[
node_rank
]
def
use_paddlecloud
():
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
,
None
)
node_ip
=
os
.
getenv
(
"POD_IP"
,
None
)
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
,
None
)
if
node_ips
is
None
or
node_ip
is
None
or
node_rank
is
None
:
return
False
else
:
return
True
class
TrainerProc
(
object
):
def
__init__
(
self
):
self
.
proc
=
None
self
.
log_fn
=
None
self
.
log_offset
=
None
self
.
rank
=
None
self
.
local_rank
=
None
self
.
cmd
=
None
def
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
None
):
current_env
=
copy
.
copy
(
os
.
environ
.
copy
())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env
.
pop
(
"http_proxy"
,
None
)
current_env
.
pop
(
"https_proxy"
,
None
)
procs
=
[]
for
idx
,
t
in
enumerate
(
pod
.
trainers
):
proc_env
=
{
"FLAGS_selected_gpus"
:
"%s"
%
","
.
join
([
str
(
g
)
for
g
in
t
.
gpus
]),
"PADDLE_TRAINER_ID"
:
"%d"
%
t
.
rank
,
"PADDLE_CURRENT_ENDPOINT"
:
"%s"
%
t
.
endpoint
,
"PADDLE_TRAINERS_NUM"
:
"%d"
%
cluster
.
trainers_nranks
(),
"PADDLE_TRAINER_ENDPOINTS"
:
","
.
join
(
cluster
.
trainers_endpoints
())
}
current_env
.
update
(
proc_env
)
logger
.
debug
(
"trainer proc env:{}"
.
format
(
current_env
))
# cmd = [sys.executable, "-u", training_script]
logger
.
info
(
"start trainer proc:{} env:{}"
.
format
(
cmd
,
proc_env
))
fn
=
None
if
log_dir
is
not
None
:
os
.
system
(
"mkdir -p {}"
.
format
(
log_dir
))
fn
=
open
(
"%s/workerlog.%d"
%
(
log_dir
,
idx
),
"a"
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
)
else
:
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
)
tp
=
TrainerProc
()
tp
.
proc
=
proc
tp
.
rank
=
t
.
rank
tp
.
local_rank
=
idx
tp
.
log_fn
=
fn
tp
.
log_offset
=
fn
.
tell
()
if
fn
else
None
tp
.
cmd
=
cmd
procs
.
append
(
proc
)
return
procs
core/engine/local_cluster.py
浏览文件 @
1a044578
...
...
@@ -19,9 +19,14 @@ import copy
import
os
import
sys
import
subprocess
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
LocalClusterEngine
(
Engine
):
...
...
@@ -97,21 +102,49 @@ class LocalClusterEngine(Engine):
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
selected_gpus
=
self
.
envs
[
"selected_gpus"
].
split
(
","
)
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
[
x
.
strip
()
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
.
strip
())
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
selected_gpus_num
=
len
(
selected_gpus
)
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
else
:
# trainers_num = 1 or not use paddlecloud ips="a,b"
for
i
in
range
(
selected_gpus_num
-
1
):
while
True
:
new_port
=
envs
.
find_free_port
()
if
new_port
not
in
ports
:
ports
.
append
(
new_port
)
break
user_endpoints
=
","
.
join
([
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
user_endpoints
=
","
.
join
(
[
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
for
i
in
range
(
selected_gpus_num
):
current_env
.
update
({
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
...
...
models/multitask/mmoe/config.yaml
浏览文件 @
1a044578
...
...
@@ -49,10 +49,12 @@ runner:
save_checkpoint_path
:
"
increment"
save_inference_path
:
"
inference"
print_interval
:
1
phases
:
[
train
]
-
name
:
infer_runner
class
:
infer
init_model_path
:
"
increment/1"
device
:
cpu
phases
:
[
infer
]
phase
:
-
name
:
train
...
...
models/rank/dnn/README.md
浏览文件 @
1a044578
...
...
@@ -30,13 +30,12 @@
### 一键下载训练及测试数据
```
bash
sh
download_data
.sh
sh
run
.sh
```
执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于
`./train_data_full/`
,全量测试数据放置于
`./test_data_full/`
,用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./
test_data/`
。
进入models/rank/dnn/data目录下,执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。原始的全量数据放置于
`./train_data_full/`
,原始的全量测试数据放置于
`./test_data_full/`
,原始的用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./test_data/`
。处理后的全量训练数据放置于
`./slot_train_data_full/`
,处理后的全量测试数据放置于
`./slot_test_data_full/`
,处理后的用于快速验证的训练数据与测试数据放置于
`./slot_train_data/`
与
`./slot_
test_data/`
。
执行该脚本的理想输出为:
```
bash
>
sh download_data.sh
--2019-11-26
06:31:33-- https://fleet.bj.bcebos.com/ctr_data.tar.gz
Resolving fleet.bj.bcebos.com... 10.180.112.31
Connecting to fleet.bj.bcebos.com|10.180.112.31|:443... connected.
...
...
@@ -100,7 +99,7 @@ def get_dataset(inputs, args)
3.
创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用
`MultiSlotDataGenerator`
;若已经完成了预处理并保存为数据文件,可以直接以
`string`
的方式进行读取,使用
`MultiSlotStringDataGenerator`
,能够进一步加速。在示例代码,我们继承并实现了名为
`CriteoDataset`
的dataset子类,使用
`MultiSlotDataGenerator`
方法。
4.
继承并实现基类中的
`generate_sample`
函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
5.
在这个可以迭代的函数中,如示例代码中的
`def reader()`
,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
,并转换为类似字典的形式。在示例代码中,我们使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如
`[('dense_feature',[value]),('C1',[value]),('C2',[value]),...,('C26',[value]),('label',[value])]`
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
。在示例代码中,我们将数据整理成
`click:value dense_feature:value ... dense_feature:value 1:value ... 26:value`
的格式。用print输出是因为我们在run.sh中将结果重定向到slot_train_data等文件中,由模型直接读取。在用户自定义使用时,可以使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出,并在config.yaml中的data_converter参数指定reader的路径。
```
python
...
...
@@ -113,11 +112,22 @@ hash_dim_ = 1000001
continuous_range_
=
range
(
1
,
14
)
categorical_range_
=
range
(
14
,
40
)
class
CriteoDataset
(
dg
.
MultiSlotDataGenerator
):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def
generate_sample
(
self
,
line
):
"""
Read the data line by line and process it as a dictionary
"""
def
reader
():
"""
This function needs to be implemented by the user, based on data format
"""
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
dense_feature
=
[]
sparse_feature
=
[]
...
...
@@ -137,11 +147,16 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
for
idx
in
categorical_range_
:
feature_name
.
append
(
"C"
+
str
(
idx
-
13
))
feature_name
.
append
(
"label"
)
yield
zip
(
feature_name
,
[
dense_feature
]
+
sparse_feature
+
[
label
])
s
=
"click:"
+
str
(
label
[
0
])
for
i
in
dense_feature
:
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
# add print for data preprocessing
return
reader
d
=
CriteoDataset
()
d
.
run_from_stdin
()
```
...
...
@@ -149,117 +164,124 @@ d.run_from_stdin()
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
`cat 数据文件 | python dataset读取python文件`
进行dataset代码的调试:
```
bash
cat
train_data/part-0 | python
dataset_generator
.py
cat
train_data/part-0 | python
get_slot_data
.py
```
输出的数据格式如下:
`
dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; ... ; sparse_input:size ; sparse_input:value ; label:size ; label
:value `
`
label:value dense_input:value ... dense_input:value sparse_input:value ... sparse_input
:value `
理想的输出为(截取了一个片段):
```
bash
...
13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0
click:0 dense_feature:0.05 dense_feature:0.00663349917081 dense_feature:0.05 dense_feature:0.0 dense_feature:0.02159375 dense_feature:0.008 dense_feature:0.15 dense_feature:0.04 dense_feature:0.362 dense_feature:0.1 dense_feature:0.2 dense_feature:0.0 dense_feature:0.04 1:715353 2:817085 3:851010 4:833725 5:286835 6:948614 7:881652 8:507110 9:27346 10:646986 11:643076 12:200960 13:18464 14:202774 15:532679 16:729573 17:342789 18:562805 19:880474 20:984402 21:666449 22:26235 23:700326 24:452909 25:884722 26:787527
...
```
#
## 模型组网
### 数据输入声明
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_feature_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_input_ids`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`C1~C26`
的26个稀疏参数输入,并设置
`lod_level=1`
,代表其为变长数据,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
在Paddle中数据输入的声明使用
`paddle.fluid.data()`
,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。
```
python
dense_input
=
fluid
.
data
(
name
=
"dense_input"
,
shape
=
[
-
1
,
args
.
dense_feature_dim
],
dtype
=
"float32"
)
sparse_input_ids
=
[
fluid
.
data
(
name
=
"C"
+
str
(
i
),
shape
=
[
-
1
,
1
],
lod_level
=
1
,
dtype
=
"int64"
)
for
i
in
range
(
1
,
27
)
]
label
=
fluid
.
data
(
name
=
"label"
,
shape
=
[
-
1
,
1
],
dtype
=
"int64"
)
inputs
=
[
dense_input
]
+
sparse_input_ids
+
[
label
]
```
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_input_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_inputs`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`1~26`
的26个稀疏参数输入,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
### CTR-DNN模型组网
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
三
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
四
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
#### Embedding层
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
shape由超参的
`sparse_feature_dim`
和
`embedding_siz
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
由超参的
`sparse_feature_number`
和
`sparse_feature_dimshap
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
各个稀疏的输入通过Embedding层后,将其合并起来,置于一个list内,以方便进行concat的操作。
```
python
def
embedding_layer
(
input
):
return
fluid
.
layers
.
embedding
(
if
self
.
distributed_embedding
:
emb
=
fluid
.
contrib
.
layers
.
sparse_embedding
(
input
=
input
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
else
:
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
is_sparse
=
True
,
size
=
[
args
.
sparse_feature_dim
,
args
.
embedding_size
],
is_distributed
=
self
.
is_distributed
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()),
)
initializer
=
fluid
.
initializer
.
Uniform
()))
emb_sum
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'sum'
)
return
emb_sum
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
inputs
[
1
:
-
1
]
))
# [C1~C26]
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
self
.
sparse_inputs
))
# [C1~C26]
```
#### FC层
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
3层FC,每层FC的输出维度都为400
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
4层FC,每层FC的输出维度由超参
`fc_sizes`
指定
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
```
python
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
inputs
[
0
:
1
],
axis
=
1
)
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
[
self
.
dense_input
],
axis
=
1
)
fcs
=
[
concated
]
hidden_layers
=
envs
.
get_global_env
(
"hyper_parameters.fc_sizes"
)
for
size
in
hidden_layers
:
output
=
fluid
.
layers
.
fc
(
input
=
fcs
[
-
1
],
size
=
size
,
act
=
'relu'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1.0
/
math
.
sqrt
(
fcs
[
-
1
].
shape
[
1
]))))
fcs
.
append
(
output
)
fc1
=
fluid
.
layers
.
fc
(
input
=
concated
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
concated
.
shape
[
1
]))),
)
fc2
=
fluid
.
layers
.
fc
(
input
=
fc1
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc1
.
shape
[
1
]))),
)
fc3
=
fluid
.
layers
.
fc
(
input
=
fc2
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc2
.
shape
[
1
]))),
)
```
#### Loss及Auc计算
-
预测的结果通过一个输出shape为2的FC层给出,该FC层的激活函数是softmax,会给出每条样本分属于正负样本的概率。
-
每条样本的损失由交叉熵给出,交叉熵的输入维度为[batch_size,2],数据类型为float,label的输入维度为[batch_size,1],数据类型为int。
-
该batch的损失
`avg_cost`
是各条样本的损失之和
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
全局auc:
`auc_var`
,当前batch的auc:
`batch_auc_var`
,以及auc_states:
`auc_states
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
从第一个batch累计到当前batch的全局auc:
`auc`
,最近几个batch的auc:
`batch_auc`
,以及auc_states:
`_
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
```
predict = fluid.layers.fc(
input=fc3
,
input=fcs[-1]
,
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=inputs[-1])
auc_var, batch_auc_var, auc_states = fluid.layers.auc(
input=predict,
label=inputs[-1],
scale=1 / math.sqrt(fcs[-1].shape[1]))))
self.predict = predict
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,label=self.label_input,
num_thresholds=2**12,
slide_steps=20)
```
完成上述组网后,我们最终可以通过训练拿到
`avg_cost`
与
`auc`
两个重要指标。
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
```
完成上述组网后,我们最终可以通过训练拿到
`BATCH_AUC`
与
`auc`
两个重要指标。
```
PaddleRec: Runner single_cpu_infer Begin
Executor Mode: infer
processor_register begin
Running SingleInstance.
Running SingleNetwork.
Running SingleInferStartup.
Running SingleInferRunner.
load persistables from increment_dnn/3
batch: 20, BATCH_AUC: [0.75670043], AUC: [0.77490453]
batch: 40, BATCH_AUC: [0.77020144], AUC: [0.77490437]
batch: 60, BATCH_AUC: [0.77464683], AUC: [0.77490435]
batch: 80, BATCH_AUC: [0.76858989], AUC: [0.77490416]
batch: 100, BATCH_AUC: [0.75728286], AUC: [0.77490362]
batch: 120, BATCH_AUC: [0.75007016], AUC: [0.77490286]
...
batch: 720, BATCH_AUC: [0.76840144], AUC: [0.77489881]
batch: 740, BATCH_AUC: [0.76659033], AUC: [0.77489854]
batch: 760, BATCH_AUC: [0.77332639], AUC: [0.77489849]
batch: 780, BATCH_AUC: [0.78361653], AUC: [0.77489874]
Infer phase2 of epoch increment_dnn/3 done, use time: 52.7707588673, global metrics: BATCH_AUC=[0.78361653], AUC=[0.77489874]
PaddleRec Finish
```
## 流式训练(OnlineLearning)任务启动及配置流程
...
...
@@ -387,5 +409,5 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
```
4.
准备好数据后, 即可按照标准的训练流程进行流式训练了
```
shell
python
-m
paddlerec.run
-m
models/r
erank/ctr-
dnn/config.yaml
python
-m
paddlerec.run
-m
models/r
ank/
dnn/config.yaml
```
models/rank/dnn/data/get_slot_data.py
浏览文件 @
1a044578
...
...
@@ -61,8 +61,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
yield
None
print
(
s
.
strip
())
# add print for data preprocessing
return
reader
...
...
models/recall/word2vec/README.md
浏览文件 @
1a044578
...
...
@@ -222,15 +222,18 @@ Infer phase2 of epoch 3 done, use time: 4.43099021912, global metrics: acc=[1.]
## 论文复现
1.
用原论文的完整数据复现论文效果需要在config.yaml修改超参:
```
- name: dataset_train
batch_size: 100 # 1. 修改batch_size为100
type: DataLoader
data_path: "{workspace}/data/all_train" # 2. 修改数据为全量训练数据
word_count_dict_path: "{workspace}/data/all_dict/
word_count_dict.txt" # 3. 修改词表为全量词表
word_count_dict_path: "{workspace}/data/all_dict/word_count_dict.txt" # 3. 修改词表为全量词表
data_converter: "{workspace}/w2v_reader.py"
- name: dataset_infer
data_path: "{workspace}/data/all_test" # 4. 修改数据为全量测试数据
word_id_dict_path: "{workspace}/data/all_dict/word_id_dict.txt" # 5. 修改词表为全量词表
-
name: single_cpu_train
-
epochs: # 4. 修改config.yaml中runner的epochs为5。
```
修改后运行方案:修改config.yaml中的'workspace'为config.yaml的目录位置,执行
```
...
...
@@ -239,7 +242,7 @@ python -m paddlerec.run -m /home/your/dir/config.yaml #调试模式 直接指定
2.
使用自定义预测程序预测全量测试集:
```
python infer.py --test_dir ./data/all_test --dict_path ./data/all_dict/word_id_dict.txt --batch_size
20000 --model_dir ./increment_w2v/ --start_index 0 --last_index 5
--emb_size 300
python infer.py --test_dir ./data/all_test --dict_path ./data/all_dict/word_id_dict.txt --batch_size
10000 --model_dir ./increment_w2v/ --start_index 0 --last_index 4
--emb_size 300
```
结论:使用cpu训练5轮,自定义预测准确率为0.540,每轮训练时间7小时左右。
...
...
models/recall/youtube_dnn/README.md
浏览文件 @
1a044578
...
...
@@ -8,7 +8,7 @@
├── data.txt
├── test
├── data.txt
├── generate_ramdom_data # 随机训练数据生成文件
├── generate_ramdom_data
.py
# 随机训练数据生成文件
├── __init__.py
├── README.md # 文档
├── model.py #模型文件
...
...
@@ -107,7 +107,7 @@ python infer.py --use_gpu 1 --test_epoch 19 --inference_model_dir ./inference_yo
```
### 运行
```
python -m paddlerec.run -m paddlerec.models.recall.
w2v
python -m paddlerec.run -m paddlerec.models.recall.
youtube_dnn
```
### 结果展示
...
...
run.py
浏览文件 @
1a044578
...
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"log_dir"
]
=
"logs"
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录