Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
PaddleRec
提交
6de9c8da
P
PaddleRec
项目概览
PaddlePaddle
/
PaddleRec
通知
68
Star
12
Fork
5
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
27
列表
看板
标记
里程碑
合并请求
10
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
27
Issue
27
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6de9c8da
编写于
9月 28, 2020
作者:
Y
yinhaofeng
提交者:
GitHub
9月 28, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'master' into dssm
上级
2cf8c6f0
3b9c100b
变更
32
隐藏空白更改
内联
并排
Showing
32 changed file
with
2393 addition
and
202 deletion
+2393
-202
core/engine/cluster/cluster.py
core/engine/cluster/cluster.py
+38
-2
core/engine/cluster_utils.py
core/engine/cluster_utils.py
+324
-0
core/engine/local_cluster.py
core/engine/local_cluster.py
+63
-30
core/trainer.py
core/trainer.py
+3
-4
core/trainers/framework/network.py
core/trainers/framework/network.py
+13
-11
core/trainers/framework/runner.py
core/trainers/framework/runner.py
+12
-6
doc/custom_reader.md
doc/custom_reader.md
+309
-0
models/multitask/mmoe/config.yaml
models/multitask/mmoe/config.yaml
+2
-0
models/rank/dnn/README.md
models/rank/dnn/README.md
+103
-81
models/rank/dnn/data/get_slot_data.py
models/rank/dnn/data/get_slot_data.py
+1
-2
models/recall/gru4rec/README.md
models/recall/gru4rec/README.md
+206
-0
models/recall/gru4rec/config.yaml
models/recall/gru4rec/config.yaml
+23
-18
models/recall/gru4rec/data/convert_format.py
models/recall/gru4rec/data/convert_format.py
+48
-0
models/recall/gru4rec/data/download.py
models/recall/gru4rec/data/download.py
+61
-0
models/recall/gru4rec/data/preprocess.py
models/recall/gru4rec/data/preprocess.py
+70
-0
models/recall/gru4rec/data/text2paddle.py
models/recall/gru4rec/data/text2paddle.py
+115
-0
models/recall/gru4rec/data_prepare.sh
models/recall/gru4rec/data_prepare.sh
+30
-0
models/recall/gru4rec/model.py
models/recall/gru4rec/model.py
+5
-4
models/recall/word2vec/README.md
models/recall/word2vec/README.md
+7
-4
models/recall/youtube_dnn/README.md
models/recall/youtube_dnn/README.md
+2
-2
models/treebased/tdm/README.md
models/treebased/tdm/README.md
+1
-0
models/treebased/tdm/build_tree.md
models/treebased/tdm/build_tree.md
+0
-19
models/treebased/tdm/config.yaml
models/treebased/tdm/config.yaml
+9
-19
models/treebased/tdm/gen_tree/README.md
models/treebased/tdm/gen_tree/README.md
+120
-0
models/treebased/tdm/gen_tree/__init__.py
models/treebased/tdm/gen_tree/__init__.py
+17
-0
models/treebased/tdm/gen_tree/cluster.py
models/treebased/tdm/gen_tree/cluster.py
+311
-0
models/treebased/tdm/gen_tree/emb_util.py
models/treebased/tdm/gen_tree/emb_util.py
+73
-0
models/treebased/tdm/gen_tree/gen_tree.py
models/treebased/tdm/gen_tree/gen_tree.py
+52
-0
models/treebased/tdm/gen_tree/tree_builder.py
models/treebased/tdm/gen_tree/tree_builder.py
+46
-0
models/treebased/tdm/gen_tree/tree_impl.py
models/treebased/tdm/gen_tree/tree_impl.py
+122
-0
models/treebased/tdm/gen_tree/tree_search_util.py
models/treebased/tdm/gen_tree/tree_search_util.py
+206
-0
run.py
run.py
+1
-0
未找到文件。
core/engine/cluster/cluster.py
浏览文件 @
6de9c8da
...
...
@@ -19,10 +19,16 @@ import copy
import
os
import
subprocess
import
warnings
import
sys
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.factory
import
TrainerFactory
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
ClusterEngine
(
Engine
):
...
...
@@ -47,8 +53,38 @@ class ClusterEngine(Engine):
self
.
backend
))
def
start_worker_procs
(
self
):
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
trainer
.
run
()
if
(
envs
.
get_runtime_environ
(
"fleet_mode"
)
==
"COLLECTIVE"
):
#trainer_ports = os.getenv("TRAINER_PORTS", None).split(",")
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
)))
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
range
(
int
(
os
.
getenv
(
"TRAINER_GPU_CARD_COUNT"
))):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
)]
print
(
"selected_gpus:{}"
.
format
(
selected_gpus
))
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
logs_dir
=
envs
.
get_runtime_environ
(
"log_dir"
)
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
print
(
"cluster:{}"
.
format
(
cluster
))
print
(
"pod:{}"
.
format
(
pod
))
else
:
trainer
=
TrainerFactory
.
create
(
self
.
trainer
)
trainer
.
run
()
def
start_master_procs
(
self
):
if
self
.
backend
==
"PADDLECLOUD"
:
...
...
core/engine/cluster_utils.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
functools
import
logging
import
socket
import
time
import
os
import
signal
import
copy
import
sys
import
subprocess
from
contextlib
import
closing
import
socket
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
Cluster
(
object
):
def
__init__
(
self
,
hdfs
):
self
.
job_server
=
None
self
.
pods
=
[]
self
.
hdfs
=
None
self
.
job_stage_flag
=
None
def
__str__
(
self
):
return
"job_server:{} pods:{} job_stage_flag:{} hdfs:{}"
.
format
(
self
.
job_server
,
[
str
(
pod
)
for
pod
in
self
.
pods
],
self
.
job_stage_flag
,
self
.
hdfs
)
def
__eq__
(
self
,
cluster
):
if
len
(
self
.
pods
)
!=
len
(
cluster
.
pods
):
return
False
for
a
,
b
in
zip
(
self
.
pods
,
cluster
.
pods
):
if
a
!=
b
:
return
False
if
self
.
job_stage_flag
!=
cluster
.
job_stage_flag
:
return
False
return
True
def
__ne__
(
self
,
cluster
):
return
not
self
.
__eq__
(
cluster
)
def
update_pods
(
cluster
):
self
.
pods
=
copy
.
copy
(
cluster
.
pods
)
def
trainers_nranks
(
self
):
return
len
(
self
.
trainers_endpoints
())
def
pods_nranks
(
self
):
return
len
(
self
.
pods
)
def
trainers_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
for
t
in
pod
.
trainers
:
r
.
append
(
t
.
endpoint
)
return
r
def
pods_endpoints
(
self
):
r
=
[]
for
pod
in
self
.
pods
:
ep
=
"{}:{}"
.
format
(
pod
.
addr
,
pod
.
port
)
assert
pod
.
port
!=
None
and
pod
.
addr
!=
None
,
"{} not a valid endpoint"
.
format
(
ep
)
r
.
append
(
ep
)
return
r
def
get_pod_by_id
(
self
,
pod_id
):
for
pod
in
self
.
pods
:
if
str
(
pod_id
)
==
str
(
pod
.
id
):
return
pod
return
None
class
JobServer
(
object
):
def
__init__
(
self
):
self
.
endpoint
=
None
def
__str__
(
self
):
return
"{}"
.
format
(
self
.
endpoint
)
def
__eq__
(
self
,
j
):
return
self
.
endpint
==
j
.
endpoint
def
__ne__
(
self
,
j
):
return
not
self
==
j
class
Trainer
(
object
):
def
__init__
(
self
):
self
.
gpus
=
[]
self
.
endpoint
=
None
self
.
rank
=
None
def
__str__
(
self
):
return
"gpu:{} endpoint:{} rank:{}"
.
format
(
self
.
gpus
,
self
.
endpoint
,
self
.
rank
)
def
__eq__
(
self
,
t
):
if
len
(
self
.
gpus
)
!=
len
(
t
.
gpus
):
return
False
if
self
.
endpoint
!=
t
.
endpoint
or
\
self
.
rank
!=
t
.
rank
:
return
False
for
a
,
b
in
zip
(
self
.
gpus
,
t
.
gpus
):
if
a
!=
b
:
return
False
return
True
def
__ne__
(
self
,
t
):
return
not
self
==
t
def
rank
(
self
):
return
self
.
rank
class
Pod
(
object
):
def
__init__
(
self
):
self
.
rank
=
None
self
.
id
=
None
self
.
addr
=
None
self
.
port
=
None
self
.
trainers
=
[]
self
.
gpus
=
[]
def
__str__
(
self
):
return
"rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}"
.
format
(
self
.
rank
,
self
.
id
,
self
.
addr
,
self
.
port
,
self
.
gpus
,
[
str
(
t
)
for
t
in
self
.
trainers
])
def
__eq__
(
self
,
pod
):
if
self
.
rank
!=
pod
.
rank
or
\
self
.
id
!=
pod
.
id
or
\
self
.
addr
!=
pod
.
addr
or
\
self
.
port
!=
pod
.
port
:
logger
.
debug
(
"pod {} != pod"
.
format
(
self
,
pod
))
return
False
if
len
(
self
.
trainers
)
!=
len
(
pod
.
trainers
):
logger
.
debug
(
"trainers {} != {}"
.
format
(
self
.
trainers
,
pod
.
trainers
))
return
False
for
i
in
range
(
len
(
self
.
trainers
)):
if
self
.
trainers
[
i
]
!=
pod
.
trainers
[
i
]:
logger
.
debug
(
"trainer {} != {}"
.
format
(
self
.
trainers
[
i
],
pod
.
trainers
[
i
]))
return
False
return
True
def
__ne__
(
self
,
pod
):
return
not
self
==
pod
def
parse_response
(
self
,
res_pods
):
pass
def
rank
(
self
):
return
self
.
rank
def
get_visible_gpus
(
self
):
r
=
""
for
g
in
self
.
gpus
:
r
+=
"{},"
.
format
(
g
)
assert
r
!=
""
,
"this pod {} can't see any gpus"
.
format
(
self
)
r
=
r
[:
-
1
]
return
r
def
get_cluster
(
node_ips
,
node_ip
,
paddle_ports
,
selected_gpus
):
assert
type
(
paddle_ports
)
is
list
,
"paddle_ports must be list"
cluster
=
Cluster
(
hdfs
=
None
)
trainer_rank
=
0
for
node_rank
,
ip
in
enumerate
(
node_ips
):
pod
=
Pod
()
pod
.
rank
=
node_rank
pod
.
addr
=
ip
for
i
in
range
(
len
(
selected_gpus
)):
trainer
=
Trainer
()
trainer
.
gpus
.
append
(
selected_gpus
[
i
])
trainer
.
endpoint
=
"%s:%d"
%
(
ip
,
paddle_ports
[
i
])
trainer
.
rank
=
trainer_rank
trainer_rank
+=
1
pod
.
trainers
.
append
(
trainer
)
cluster
.
pods
.
append
(
pod
)
pod_rank
=
node_ips
.
index
(
node_ip
)
return
cluster
,
cluster
.
pods
[
pod_rank
]
def
get_cloud_cluster
(
selected_gpus
,
args_port
=
None
):
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
)
assert
node_ips
is
not
None
,
"PADDLE_TRAINERS should not be None"
print
(
"node_ips:{}"
.
format
(
node_ips
))
node_ip
=
os
.
getenv
(
"POD_IP"
)
assert
node_ip
is
not
None
,
"POD_IP should not be None"
print
(
"node_ip:{}"
.
format
(
node_ip
))
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
)
assert
node_rank
is
not
None
,
"PADDLE_TRAINER_ID should not be None"
print
(
"node_rank:{}"
.
format
(
node_rank
))
node_ips
=
node_ips
.
split
(
","
)
num_nodes
=
len
(
node_ips
)
node_rank
=
int
(
node_rank
)
started_port
=
args_port
print
(
"num_nodes:"
,
num_nodes
)
if
num_nodes
>
1
:
try
:
paddle_port
=
int
(
os
.
getenv
(
"PADDLE_PORT"
,
""
))
paddle_port_num
=
int
(
os
.
getenv
(
"TRAINER_PORTS_NUM"
,
""
))
if
paddle_port_num
>=
len
(
selected_gpus
)
and
paddle_port
!=
args_port
:
logger
.
warning
(
"Use Cloud specified port:{}."
.
format
(
paddle_port
))
started_port
=
paddle_port
except
Exception
as
e
:
print
(
e
)
pass
if
started_port
is
None
:
started_port
=
6170
logger
.
debug
(
"parsed from args:node_ips:{}
\
node_ip:{} node_rank:{} started_port:{}"
.
format
(
node_ips
,
node_ip
,
node_rank
,
started_port
))
ports
=
[
x
for
x
in
range
(
started_port
,
started_port
+
len
(
selected_gpus
))]
cluster
,
pod
=
get_cluster
(
node_ips
,
node_ip
,
ports
,
selected_gpus
)
return
cluster
,
cluster
.
pods
[
node_rank
]
def
use_paddlecloud
():
node_ips
=
os
.
getenv
(
"PADDLE_TRAINERS"
,
None
)
node_ip
=
os
.
getenv
(
"POD_IP"
,
None
)
node_rank
=
os
.
getenv
(
"PADDLE_TRAINER_ID"
,
None
)
if
node_ips
is
None
or
node_ip
is
None
or
node_rank
is
None
:
return
False
else
:
return
True
class
TrainerProc
(
object
):
def
__init__
(
self
):
self
.
proc
=
None
self
.
log_fn
=
None
self
.
log_offset
=
None
self
.
rank
=
None
self
.
local_rank
=
None
self
.
cmd
=
None
def
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
None
):
current_env
=
copy
.
copy
(
os
.
environ
.
copy
())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env
.
pop
(
"http_proxy"
,
None
)
current_env
.
pop
(
"https_proxy"
,
None
)
procs
=
[]
for
idx
,
t
in
enumerate
(
pod
.
trainers
):
proc_env
=
{
"FLAGS_selected_gpus"
:
"%s"
%
","
.
join
([
str
(
g
)
for
g
in
t
.
gpus
]),
"PADDLE_TRAINER_ID"
:
"%d"
%
t
.
rank
,
"PADDLE_CURRENT_ENDPOINT"
:
"%s"
%
t
.
endpoint
,
"PADDLE_TRAINERS_NUM"
:
"%d"
%
cluster
.
trainers_nranks
(),
"PADDLE_TRAINER_ENDPOINTS"
:
","
.
join
(
cluster
.
trainers_endpoints
())
}
current_env
.
update
(
proc_env
)
logger
.
debug
(
"trainer proc env:{}"
.
format
(
current_env
))
# cmd = [sys.executable, "-u", training_script]
logger
.
info
(
"start trainer proc:{} env:{}"
.
format
(
cmd
,
proc_env
))
fn
=
None
if
log_dir
is
not
None
:
os
.
system
(
"mkdir -p {}"
.
format
(
log_dir
))
fn
=
open
(
"%s/workerlog.%d"
%
(
log_dir
,
idx
),
"a"
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
)
else
:
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
)
tp
=
TrainerProc
()
tp
.
proc
=
proc
tp
.
rank
=
t
.
rank
tp
.
local_rank
=
idx
tp
.
log_fn
=
fn
tp
.
log_offset
=
fn
.
tell
()
if
fn
else
None
tp
.
cmd
=
cmd
procs
.
append
(
proc
)
return
procs
core/engine/local_cluster.py
浏览文件 @
6de9c8da
...
...
@@ -19,9 +19,14 @@ import copy
import
os
import
sys
import
subprocess
import
logging
from
paddlerec.core.engine.engine
import
Engine
from
paddlerec.core.utils
import
envs
import
paddlerec.core.engine.cluster_utils
as
cluster_utils
logger
=
logging
.
getLogger
(
"root"
)
logger
.
propagate
=
False
class
LocalClusterEngine
(
Engine
):
...
...
@@ -97,42 +102,70 @@ class LocalClusterEngine(Engine):
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
elif
fleet_mode
.
upper
()
==
"COLLECTIVE"
:
selected_gpus
=
self
.
envs
[
"selected_gpus"
].
split
(
","
)
cuda_visible_devices
=
os
.
getenv
(
"CUDA_VISIBLE_DEVICES"
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
selected_gpus
=
[
x
.
strip
()
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
else
:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list
=
cuda_visible_devices
.
split
(
','
)
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
):
assert
x
in
cuda_visible_devices_list
,
"Can't find "
\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."
\
%
(
x
,
cuda_visible_devices
)
selected_gpus
=
[
cuda_visible_devices_list
.
index
(
x
.
strip
())
for
x
in
self
.
envs
[
"selected_gpus"
].
split
(
","
)
]
selected_gpus_num
=
len
(
selected_gpus
)
for
i
in
range
(
selected_gpus_num
-
1
):
while
True
:
new_port
=
envs
.
find_free_port
()
if
new_port
not
in
ports
:
ports
.
append
(
new_port
)
break
user_endpoints
=
","
.
join
([
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
factory
=
"paddlerec.core.factory"
cmd
=
[
sys
.
executable
,
"-u"
,
"-m"
,
factory
,
self
.
trainer
]
for
i
in
range
(
selected_gpus_num
):
current_env
.
update
({
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
"TRAINING_ROLE"
:
"TRAINER"
,
"PADDLE_TRAINER_ID"
:
str
(
i
),
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
})
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
log_fns
.
append
(
fn
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
print
(
"use_paddlecloud_flag:{}"
.
format
(
cluster_utils
.
use_paddlecloud
()))
if
cluster_utils
.
use_paddlecloud
():
cluster
,
pod
=
cluster_utils
.
get_cloud_cluster
(
selected_gpus
)
logger
.
info
(
"get cluster from cloud:{}"
.
format
(
cluster
))
procs
=
cluster_utils
.
start_local_trainers
(
cluster
,
pod
,
cmd
,
log_dir
=
logs_dir
)
else
:
# trainers_num = 1 or not use paddlecloud ips="a,b"
for
i
in
range
(
selected_gpus_num
-
1
):
while
True
:
new_port
=
envs
.
find_free_port
()
if
new_port
not
in
ports
:
ports
.
append
(
new_port
)
break
user_endpoints
=
","
.
join
(
[
"127.0.0.1:"
+
str
(
x
)
for
x
in
ports
])
for
i
in
range
(
selected_gpus_num
):
current_env
.
update
({
"PADDLE_TRAINER_ENDPOINTS"
:
user_endpoints
,
"PADDLE_CURRENT_ENDPOINTS"
:
user_endpoints
[
i
],
"PADDLE_TRAINERS_NUM"
:
str
(
worker_num
),
"TRAINING_ROLE"
:
"TRAINER"
,
"PADDLE_TRAINER_ID"
:
str
(
i
),
"FLAGS_selected_gpus"
:
str
(
selected_gpus
[
i
]),
"PADDLEREC_GPU_NUMS"
:
str
(
selected_gpus_num
)
})
os
.
system
(
"mkdir -p {}"
.
format
(
logs_dir
))
fn
=
open
(
"%s/worker.%d"
%
(
logs_dir
,
i
),
"w"
)
log_fns
.
append
(
fn
)
proc
=
subprocess
.
Popen
(
cmd
,
env
=
current_env
,
stdout
=
fn
,
stderr
=
fn
,
cwd
=
os
.
getcwd
())
procs
.
append
(
proc
)
# only wait worker to finish here
for
i
,
proc
in
enumerate
(
procs
):
...
...
core/trainer.py
浏览文件 @
6de9c8da
...
...
@@ -76,9 +76,6 @@ class Trainer(object):
_config
=
envs
.
load_yaml
(
config
)
self
.
_context
[
"env"
]
=
_config
self
.
_context
[
"dataset"
]
=
_config
.
get
(
"dataset"
)
phases
=
[]
if
phase_names
is
None
:
phases
=
_config
.
get
(
"phase"
)
...
...
@@ -86,8 +83,10 @@ class Trainer(object):
for
phase
in
_config
.
get
(
"phase"
):
if
phase
[
"name"
]
in
phase_names
:
phases
.
append
(
phase
)
self
.
_context
[
"phases"
]
=
phases
_config
[
"phase"
]
=
phases
self
.
_context
[
"env"
]
=
_config
self
.
_context
[
"dataset"
]
=
_config
.
get
(
"dataset"
)
print
(
"PaddleRec: Runner {} Begin"
.
format
(
self
.
_runner_name
))
self
.
which_engine
()
self
.
which_device
()
...
...
core/trainers/framework/network.py
浏览文件 @
6de9c8da
...
...
@@ -238,8 +238,8 @@ class PSNetwork(NetworkBase):
else
:
context
[
"fleet"
].
init_worker
()
context
[
"dataset"
]
=
{}
for
dataset
in
context
[
"env"
][
"dataset
"
]:
type
=
envs
.
get_global_env
(
"dataset."
+
dataset
[
"
name"
]
+
for
phase
in
context
[
"env"
][
"phase
"
]:
type
=
envs
.
get_global_env
(
"dataset."
+
phase
[
"dataset_
name"
]
+
".type"
)
if
type
==
"DataLoader"
:
data_loader
=
DataLoader
(
context
)
...
...
@@ -247,9 +247,9 @@ class PSNetwork(NetworkBase):
model
.
_data_loader
)
elif
type
==
"QueueDataset"
:
dataset_class
=
QueueDataset
(
context
)
context
[
"dataset"
][
dataset
[
"name"
]]
=
dataset_class
.
create_dataset
(
dataset
[
"
name"
],
context
)
context
[
"dataset"
][
phase
[
"
dataset_
name"
]]
=
dataset_class
.
create_dataset
(
phase
[
"dataset_
name"
],
context
)
context
[
"status"
]
=
"startup_pass"
def
_build_strategy
(
self
,
context
):
...
...
@@ -336,7 +336,7 @@ class PslibNetwork(NetworkBase):
self
.
_server
(
context
)
else
:
context
[
"dataset"
]
=
{}
for
dataset
in
context
[
"env"
][
"dataset
"
]:
for
phase
in
context
[
"env"
][
"phase
"
]:
type
=
envs
.
get_global_env
(
"dataset."
+
dataset
[
"name"
]
+
".type"
)
if
type
==
"DataLoader"
:
...
...
@@ -363,6 +363,7 @@ class CollectiveNetwork(NetworkBase):
def
build_network
(
self
,
context
):
context
[
"model"
]
=
{}
if
len
(
context
[
"env"
][
"phase"
])
>
1
:
print
(
"CollectiveNetwork phase:{}"
.
format
(
context
[
"env"
][
"phase"
]))
warnings
.
warn
(
"Cluster Train Only Support One Phase."
,
category
=
UserWarning
,
...
...
@@ -407,16 +408,17 @@ class CollectiveNetwork(NetworkBase):
context
[
"model"
][
model_dict
[
"name"
]][
"compiled_program"
]
=
None
context
[
"dataset"
]
=
{}
for
dataset
in
context
[
"env"
][
"dataset"
]:
type
=
envs
.
get_global_env
(
"dataset."
+
dataset
[
"name"
]
+
".type"
)
for
phase
in
context
[
"env"
][
"phase"
]:
type
=
envs
.
get_global_env
(
"dataset."
+
phase
[
"dataset_name"
]
+
".type"
)
if
type
==
"QueueDataset"
:
raise
ValueError
(
"Collective don't support QueueDataset training, please use DataLoader."
)
dataset_class
=
QueueDataset
(
context
)
context
[
"dataset"
][
dataset
[
"
name"
]]
=
dataset_class
.
create_dataset
(
dataset
[
"name"
],
context
)
context
[
"dataset"
][
phase
[
"
dataset_name"
]]
=
dataset_class
.
create_dataset
(
phase
[
"dataset_name"
],
context
)
context
[
"status"
]
=
"startup_pass"
def
_build_strategy
(
self
,
context
):
...
...
core/trainers/framework/runner.py
浏览文件 @
6de9c8da
...
...
@@ -209,9 +209,13 @@ class RunnerBase(object):
if
save_step_interval
>=
1
and
batch_id
%
save_step_interval
==
0
and
context
[
"is_infer"
]
==
False
:
if
context
[
"fleet_mode"
].
upper
()
==
"PS"
:
train_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"main_program"
]
if
context
[
"is_fleet"
]:
if
context
[
"fleet_mode"
].
upper
()
==
"PS"
:
train_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"main_program"
]
else
:
train_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"default_main_program"
]
else
:
train_prog
=
context
[
"model"
][
model_dict
[
"name"
]][
"default_main_program"
]
...
...
@@ -432,9 +436,11 @@ class RunnerBase(object):
dirname
=
envs
.
get_global_env
(
name
+
"save_step_path"
,
None
)
if
dirname
is
None
or
dirname
==
""
:
return
dirname
=
os
.
path
.
join
(
dirname
,
str
(
batch_id
))
logging
.
info
(
"
\t
save batch_id:%d model into:
\"
%s
\"
"
%
(
batch_id
,
dirname
))
dirname
=
os
.
path
.
join
(
dirname
,
"epoch_"
+
str
(
context
[
"current_epoch"
])
+
"_batch_"
+
str
(
batch_id
))
logging
.
info
(
"
\t
save epoch_id:%d, batch_id:%d model into:
\"
%s
\"
"
%
(
context
[
"current_epoch"
],
batch_id
,
dirname
))
if
is_fleet
:
if
context
[
"fleet"
].
worker_index
()
==
0
:
context
[
"fleet"
].
save_persistables
(
context
[
"exe"
],
dirname
)
...
...
doc/custom_reader.md
0 → 100644
浏览文件 @
6de9c8da
# PaddleRec 自定义数据集及Reader
用户自定义数据集及配置异步Reader,需要关注以下几个步骤:
*
[
数据集整理
](
#数据集整理
)
*
[
在模型组网中加入输入占位符
](
#在模型组网中加入输入占位符
)
*
[
Reader实现
](
#Reader的实现
)
*
[
在yaml文件中配置Reader
](
#在yaml文件中配置reader
)
我们以CTR-DNN模型为例,给出了从数据整理,变量定义,Reader写法,调试的完整历程。
*
[
数据及Reader示例-DNN
](
#数据及Reader示例-DNN
)
## 数据集整理
PaddleRec支持模型自定义数据集。
关于数据的tips:
1.
数据量:
PaddleRec面向大规模数据设计,可以轻松支持亿级的数据读取,工业级的数据读写api:`dataset`在搜索、推荐、信息流等业务得到了充分打磨。
2.
文件类型:
支持任意直接可读的文本数据,`dataset`同时支持`.gz`格式的文本压缩数据,无需额外代码,可直接读取。数据样本应以`\n`为标志,按行组织。
3.
文件存放位置:
文件通常存放在训练节点本地,但同时,`dataset`支持使用`hadoop`远程读取数据,数据无需下载到本地,为dataset配置hadoop相关账户及地址即可。
4.
数据类型
Reader处理的是以行为单位的`string`数据,喂入网络的数据需要转为`int`,`float`的数值数据,不支持`string`喂入网络,不建议明文保存及处理训练数据。
5.
Tips
Dataset模式下,训练线程与数据读取线程的关系强相关,为了多线程充分利用,`强烈建议将文件合理的拆为多个小文件`,尤其是在分布式训练场景下,可以均衡各个节点的数据量,同时加快数据的下载速度。
## 在模型组网中加入输入占位符
Reader读取文件后,产出的数据喂入网络,需要有占位符进行接收。占位符在Paddle中使用
`fluid.data`
或
`fluid.layers.data`
进行定义。
`data`
的定义可以参考
[
fluid.data
](
https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/fluid_cn/data_cn.html#data
)
以及
[
fluid.layers.data
](
https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/layers_cn/data_cn.html#data
)
。
加入您希望输入三个数据,分别是维度32的数据A,维度变长的稀疏数据B,以及一个一维的标签数据C,并希望梯度可以经过该变量向前传递,则示例如下:
数据A的定义:
```
python
var_a
=
fluid
.
data
(
name
=
'A'
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
```
数据B的定义,变长数据的使用可以参考
[
LoDTensor
](
https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/basic_concept/lod_tensor.html#cn-user-guide-lod-tensor
)
:
```
python
var_b
=
fluid
.
data
(
name
=
'B'
,
shape
=
[
-
1
,
1
],
lod_level
=
1
,
dtype
=
'int64'
)
```
数据C的定义:
```
python
var_c
=
fluid
.
data
(
name
=
'C'
,
shape
=
[
-
1
,
1
],
dtype
=
'int32'
)
var_c
.
stop_gradient
=
False
```
当我们完成以上三个数据的定义后,在PaddleRec的模型定义中,还需将其加入model基类成员变量
`self._data_var`
```
python
self
.
_data_var
.
append
(
var_a
)
self
.
_data_var
.
append
(
var_b
)
self
.
_data_var
.
append
(
var_c
)
```
至此,我们完成了在组网中定义输入数据的工作。
## Reader的实现
### Reader的实现范式
Reader的逻辑需要一个单独的python文件进行描述。我们试写一个
`test_reader.py`
,实现的具体流程如下:
1.
首先我们需要引入Reader基类
```python
from paddlerec.core.reader import ReaderBase
```
2.
创建一个子类,继承Reader的基类,训练所需Reader命名为
`TrainerReader`
```
python
class
Reader
(
ReaderBase
):
def
init
(
self
):
pass
def
generator_sample
(
self
,
line
):
pass
```
3.
在
`init(self)`
函数中声明一些在数据读取中会用到的变量,必要时可以在
`config.yaml`
文件中配置变量,利用
`env.get_global_env()`
拿到。
比如,我们希望从yaml文件中读取一个数据预处理变量
`avg=10`
,目的是将数据A的数据缩小10倍,可以这样实现:
首先更改yaml文件,在某个hyper_parameters下加入该变量
```yaml
...
hyper_parameters:
reader:
avg: 10
...
```
再更改Reader的init函数
```python
from paddlerec.core.utils import envs
class Reader(ReaderBase):
def init(self):
self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader")
def generator_sample(self, line):
pass
```
4.
继承并实现基类中的
`generate_sample(self, line)`
函数,逐行读取数据。
-
该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
-
在这个可以迭代的函数中,如示例代码中的
`def reader()`
,我们定义数据读取的逻辑。以行为单位的数据进行截取,转换及预处理。
-
最后,我们需要将数据整理为特定的格式,才能够被PaddleRec的Reader正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的,并转换为类似字典的形式。
示例: 假设数据ABC在文本数据中,每行以这样的形式存储:
```
shell
0.1,0.2,0.3...3.0,3.1,3.2
\t
99999,99998,99997
\t
1
\n
```
则示例代码如下:
```python
from paddlerec.core.utils import envs
class Reader(ReaderBase):
def init(self):
self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader")
def generator_sample(self, line):
def reader(self, line):
# 先分割 '\n', 再以 '\t'为标志分割为list
variables = (line.strip('\n')).split('\t')
# A是第一个元素,并且每个数据之间使用','分割
var_a = variables[0].split(',') # list
var_a = [float(i) / self.avg for i in var_a] # 将str数据转换为float
# B是第二个元素,同样以 ',' 分割
var_b = variables[1].split(',') # list
var_b = [int(i) for i in var_b] # 将str数据转换为int
# C是第三个元素, 只有一个元素,没有分割符
var_c = variables[2]
var_c = int(var_c) # 将str数据转换为int
var_c = [var_c] # 将单独的数据元素置入list中
# 将数据与数据名结合,组织为dict的形式
# 如下,output形式为{ A: var_a, B: var_b, C: var_c}
variable_name = ['A', 'B', 'C']
output = zip(variable_name, [var_a] + [var_b] + [var_c])
# 将数据输出,使用yield方法,将该函数变为了一个可迭代的对象
yield output
```
至此,我们完成了Reader的实现。
### 在yaml文件中配置Reader
在模型的yaml配置文件中,主要的修改是三个,如下
```
yaml
reader
:
batch_size
:
2
class
:
"
{workspace}/criteo_reader.py"
train_data_path
:
"
{workspace}/data/train_data"
```
batch_size: 顾名思义,是小批量训练时的样本大小
class: 运行改模型所需reader的路径
train_data_path: 训练数据所在文件夹
reader_debug_mode: 测试reader语法,及输出是否符合预期的debug模式的开关
## 数据及Reader示例-DNN
### Criteo数据集格式
CTR-DNN训练及测试数据集选用
[
Display Advertising Challenge
](
https://www.kaggle.com/c/criteo-display-ad-challenge/
)
所用的Criteo数据集。该数据集包括两部分:训练集和测试集。训练集包含一段时间内Criteo的部分流量,测试集则对应训练数据后一天的广告点击流量。
每一行数据格式如下所示:
```
bash
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>
```
其中
```<label>```
表示广告是否被点击,点击用1表示,未点击用0表示。
```<integer feature>```
代表数值特征(连续特征),共有13个连续特征。
```<categorical feature>```
代表分类特征(离散特征),共有26个离散特征。相邻两个特征用
```\t```
分隔,缺失特征用空格表示。测试集中
```<label>```
特征已被移除。
### Criteo数据集的预处理
数据预处理共包括两步:
-
将原始训练集按9:1划分为训练集和验证集
-
数值特征(连续特征)需进行归一化处理,但需要注意的是,对每一个特征
```<integer feature i>```
,归一化时用到的最大值并不是用全局最大值,而是取排序后95%位置处的特征值作为最大值,同时保留极值。
### CTR网络输入的定义
正如前所述,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_feature_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_input_ids`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`C1~C26`
的26个稀疏参数输入,并设置
`lod_level=1`
,代表其为变长数据,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
在Paddle中数据输入的声明使用
`paddle.fluid.layers.data()`
,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。
稀疏参数输入的定义:
```
python
def
sparse_inputs
():
ids
=
envs
.
get_global_env
(
"hyper_parameters.sparse_inputs_slots"
,
None
)
sparse_input_ids
=
[
fluid
.
layers
.
data
(
name
=
"S"
+
str
(
i
),
shape
=
[
1
],
lod_level
=
1
,
dtype
=
"int64"
)
for
i
in
range
(
1
,
ids
)
]
return
sparse_input_ids
```
稠密参数输入的定义:
```
python
def
dense_input
():
dim
=
envs
.
get_global_env
(
"hyper_parameters.dense_input_dim"
,
None
)
dense_input_var
=
fluid
.
layers
.
data
(
name
=
"D"
,
shape
=
[
dim
],
dtype
=
"float32"
)
return
dense_input_var
```
标签的定义:
```
python
def
label_input
():
label
=
fluid
.
layers
.
data
(
name
=
"click"
,
shape
=
[
1
],
dtype
=
"int64"
)
return
label
```
组合起来,正确的声明他们:
```
python
self
.
sparse_inputs
=
sparse_inputs
()
self
.
dense_input
=
dense_input
()
self
.
label_input
=
label_input
()
self
.
_data_var
.
append
(
self
.
dense_input
)
for
input
in
self
.
sparse_inputs
:
self
.
_data_var
.
append
(
input
)
self
.
_data_var
.
append
(
self
.
label_input
)
```
### Criteo Reader写法
```
python
# 引入PaddleRec的Reader基类
from
paddlerec.core.reader
import
ReaderBase
# 引入PaddleRec的读取yaml配置文件的方法
from
paddlerec.core.utils
import
envs
# 定义TrainReader,需要继承 paddlerec.core.reader.Reader
class
Reader
(
ReaderBase
):
# 数据预处理逻辑,继承自基类
# 如果无需处理, 使用pass跳过该函数的执行
def
init
(
self
):
self
.
cont_min_
=
[
0
,
-
3
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
]
self
.
cont_max_
=
[
20
,
600
,
100
,
50
,
64000
,
500
,
100
,
50
,
500
,
10
,
10
,
10
,
50
]
self
.
cont_diff_
=
[
20
,
603
,
100
,
50
,
64000
,
500
,
100
,
50
,
500
,
10
,
10
,
10
,
50
]
self
.
hash_dim_
=
envs
.
get_global_env
(
"hyper_parameters.sparse_feature_number"
,
None
)
self
.
continuous_range_
=
range
(
1
,
14
)
self
.
categorical_range_
=
range
(
14
,
40
)
# 读取数据方法,继承自基类
# 实现可以迭代的reader函数,逐行处理数据
def
generate_sample
(
self
,
line
):
"""
Read the data line by line and process it as a dictionary
"""
def
reader
():
"""
This function needs to be implemented by the user, based on data format
"""
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
dense_feature
=
[]
sparse_feature
=
[]
for
idx
in
self
.
continuous_range_
:
if
features
[
idx
]
==
""
:
dense_feature
.
append
(
0.0
)
else
:
dense_feature
.
append
(
(
float
(
features
[
idx
])
-
self
.
cont_min_
[
idx
-
1
])
/
self
.
cont_diff_
[
idx
-
1
])
for
idx
in
self
.
categorical_range_
:
sparse_feature
.
append
(
[
hash
(
str
(
idx
)
+
features
[
idx
])
%
self
.
hash_dim_
])
label
=
[
int
(
features
[
0
])]
feature_name
=
[
"D"
]
for
idx
in
self
.
categorical_range_
:
feature_name
.
append
(
"S"
+
str
(
idx
-
13
))
feature_name
.
append
(
"label"
)
yield
zip
(
feature_name
,
[
dense_feature
]
+
sparse_feature
+
[
label
])
return
reader
```
models/multitask/mmoe/config.yaml
浏览文件 @
6de9c8da
...
...
@@ -49,10 +49,12 @@ runner:
save_checkpoint_path
:
"
increment"
save_inference_path
:
"
inference"
print_interval
:
1
phases
:
[
train
]
-
name
:
infer_runner
class
:
infer
init_model_path
:
"
increment/1"
device
:
cpu
phases
:
[
infer
]
phase
:
-
name
:
train
...
...
models/rank/dnn/README.md
浏览文件 @
6de9c8da
...
...
@@ -30,13 +30,12 @@
### 一键下载训练及测试数据
```
bash
sh
download_data
.sh
sh
run
.sh
```
执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。全量训练数据放置于
`./train_data_full/`
,全量测试数据放置于
`./test_data_full/`
,用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./
test_data/`
。
进入models/rank/dnn/data目录下,执行该脚本,会从国内源的服务器上下载Criteo数据集,并解压到指定文件夹。原始的全量数据放置于
`./train_data_full/`
,原始的全量测试数据放置于
`./test_data_full/`
,原始的用于快速验证的训练数据与测试数据放置于
`./train_data/`
与
`./test_data/`
。处理后的全量训练数据放置于
`./slot_train_data_full/`
,处理后的全量测试数据放置于
`./slot_test_data_full/`
,处理后的用于快速验证的训练数据与测试数据放置于
`./slot_train_data/`
与
`./slot_
test_data/`
。
执行该脚本的理想输出为:
```
bash
>
sh download_data.sh
--2019-11-26
06:31:33-- https://fleet.bj.bcebos.com/ctr_data.tar.gz
Resolving fleet.bj.bcebos.com... 10.180.112.31
Connecting to fleet.bj.bcebos.com|10.180.112.31|:443... connected.
...
...
@@ -100,7 +99,7 @@ def get_dataset(inputs, args)
3.
创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用
`MultiSlotDataGenerator`
;若已经完成了预处理并保存为数据文件,可以直接以
`string`
的方式进行读取,使用
`MultiSlotStringDataGenerator`
,能够进一步加速。在示例代码,我们继承并实现了名为
`CriteoDataset`
的dataset子类,使用
`MultiSlotDataGenerator`
方法。
4.
继承并实现基类中的
`generate_sample`
函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.)
5.
在这个可以迭代的函数中,如示例代码中的
`def reader()`
,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
,并转换为类似字典的形式。在示例代码中,我们使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如
`[('dense_feature',[value]),('C1',[value]),('C2',[value]),...,('C26',[value]),('label',[value])]`
6.
最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的
`inputs`
必须是严格一一对应的
。在示例代码中,我们将数据整理成
`click:value dense_feature:value ... dense_feature:value 1:value ... 26:value`
的格式。用print输出是因为我们在run.sh中将结果重定向到slot_train_data等文件中,由模型直接读取。在用户自定义使用时,可以使用
`zip`
的方法将参数名与数值构成的元组组成了一个list,并将其yield输出,并在config.yaml中的data_converter参数指定reader的路径。
```
python
...
...
@@ -113,11 +112,22 @@ hash_dim_ = 1000001
continuous_range_
=
range
(
1
,
14
)
categorical_range_
=
range
(
14
,
40
)
class
CriteoDataset
(
dg
.
MultiSlotDataGenerator
):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def
generate_sample
(
self
,
line
):
"""
Read the data line by line and process it as a dictionary
"""
def
reader
():
"""
This function needs to be implemented by the user, based on data format
"""
features
=
line
.
rstrip
(
'
\n
'
).
split
(
'
\t
'
)
dense_feature
=
[]
sparse_feature
=
[]
...
...
@@ -137,11 +147,16 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
for
idx
in
categorical_range_
:
feature_name
.
append
(
"C"
+
str
(
idx
-
13
))
feature_name
.
append
(
"label"
)
yield
zip
(
feature_name
,
[
dense_feature
]
+
sparse_feature
+
[
label
])
s
=
"click:"
+
str
(
label
[
0
])
for
i
in
dense_feature
:
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
# add print for data preprocessing
return
reader
d
=
CriteoDataset
()
d
.
run_from_stdin
()
```
...
...
@@ -149,117 +164,124 @@ d.run_from_stdin()
我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令
`cat 数据文件 | python dataset读取python文件`
进行dataset代码的调试:
```
bash
cat
train_data/part-0 | python
dataset_generator
.py
cat
train_data/part-0 | python
get_slot_data
.py
```
输出的数据格式如下:
`
dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; ... ; sparse_input:size ; sparse_input:value ; label:size ; label
:value `
`
label:value dense_input:value ... dense_input:value sparse_input:value ... sparse_input
:value `
理想的输出为(截取了一个片段):
```
bash
...
13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0
click:0 dense_feature:0.05 dense_feature:0.00663349917081 dense_feature:0.05 dense_feature:0.0 dense_feature:0.02159375 dense_feature:0.008 dense_feature:0.15 dense_feature:0.04 dense_feature:0.362 dense_feature:0.1 dense_feature:0.2 dense_feature:0.0 dense_feature:0.04 1:715353 2:817085 3:851010 4:833725 5:286835 6:948614 7:881652 8:507110 9:27346 10:646986 11:643076 12:200960 13:18464 14:202774 15:532679 16:729573 17:342789 18:562805 19:880474 20:984402 21:666449 22:26235 23:700326 24:452909 25:884722 26:787527
...
```
#
## 模型组网
### 数据输入声明
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_feature_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_input_ids`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`C1~C26`
的26个稀疏参数输入,并设置
`lod_level=1`
,代表其为变长数据,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
在Paddle中数据输入的声明使用
`paddle.fluid.data()`
,会创建指定类型的占位符,数据IO会依据此定义进行数据的输入。
```
python
dense_input
=
fluid
.
data
(
name
=
"dense_input"
,
shape
=
[
-
1
,
args
.
dense_feature_dim
],
dtype
=
"float32"
)
sparse_input_ids
=
[
fluid
.
data
(
name
=
"C"
+
str
(
i
),
shape
=
[
-
1
,
1
],
lod_level
=
1
,
dtype
=
"int64"
)
for
i
in
range
(
1
,
27
)
]
label
=
fluid
.
data
(
name
=
"label"
,
shape
=
[
-
1
,
1
],
dtype
=
"int64"
)
inputs
=
[
dense_input
]
+
sparse_input_ids
+
[
label
]
```
正如数据准备章节所介绍,Criteo数据集中,分为连续数据与离散(稀疏)数据,所以整体而言,CTR-DNN模型的数据输入层包括三个,分别是:
`dense_input`
用于输入连续数据,维度由超参数
`dense_input_dim`
指定,数据类型是归一化后的浮点型数据。
`sparse_inputs`
用于记录离散数据,在Criteo数据集中,共有26个slot,所以我们创建了名为
`1~26`
的26个稀疏参数输入,数据类型为整数;最后是每条样本的
`label`
,代表了是否被点击,数据类型是整数,0代表负样例,1代表正样例。
### CTR-DNN模型组网
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
三
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
CTR-DNN模型的组网比较直观,本质是一个二分类任务,代码参考
`model.py`
。模型主要组成是一个
`Embedding`
层,
四
个
`FC`
层,以及相应的分类任务的loss计算和auc计算。
#### Embedding层
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
shape由超参的
`sparse_feature_dim`
和
`embedding_siz
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
首先介绍Embedding层的搭建方式:
`Embedding`
层的输入是
`sparse_input`
,
由超参的
`sparse_feature_number`
和
`sparse_feature_dimshap
e`
定义。需要特别解释的是
`is_sparse`
参数,当我们指定
`is_sprase=True`
后,计算图会将该参数视为稀疏参数,反向更新以及分布式通信时,都以稀疏的方式进行,会极大的提升运行效率,同时保证效果一致。
各个稀疏的输入通过Embedding层后,将其合并起来,置于一个list内,以方便进行concat的操作。
```
python
def
embedding_layer
(
input
):
return
fluid
.
layers
.
embedding
(
if
self
.
distributed_embedding
:
emb
=
fluid
.
contrib
.
layers
.
sparse_embedding
(
input
=
input
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
else
:
emb
=
fluid
.
layers
.
embedding
(
input
=
input
,
is_sparse
=
True
,
size
=
[
args
.
sparse_feature_dim
,
args
.
embedding_size
],
is_distributed
=
self
.
is_distributed
,
size
=
[
self
.
sparse_feature_number
,
self
.
sparse_feature_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()),
)
name
=
"SparseFeatFactors"
,
initializer
=
fluid
.
initializer
.
Uniform
()))
emb_sum
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'sum'
)
return
emb_sum
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
inputs
[
1
:
-
1
]
))
# [C1~C26]
sparse_embed_seq
=
list
(
map
(
embedding_layer
,
self
.
sparse_inputs
))
# [C1~C26]
```
#### FC层
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
3层FC,每层FC的输出维度都为400
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
将离散数据通过embedding查表得到的值,与连续数据的输入进行
`concat`
操作,合为一个整体输入,作为全链接层的原始输入。我们共设计了
4层FC,每层FC的输出维度由超参
`fc_sizes`
指定
,每层FC都后接一个
`relu`
激活函数,每层FC的初始化方式为符合正态分布的随机初始化,标准差与上一层的输出维度的平方根成反比。
```
python
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
inputs
[
0
:
1
],
axis
=
1
)
fc1
=
fluid
.
layers
.
fc
(
input
=
concated
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
concated
.
shape
[
1
]))),
)
fc2
=
fluid
.
layers
.
fc
(
input
=
fc1
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc1
.
shape
[
1
]))),
)
fc3
=
fluid
.
layers
.
fc
(
input
=
fc2
,
size
=
400
,
act
=
"relu"
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1
/
math
.
sqrt
(
fc2
.
shape
[
1
]))),
)
concated
=
fluid
.
layers
.
concat
(
sparse_embed_seq
+
[
self
.
dense_input
],
axis
=
1
)
fcs
=
[
concated
]
hidden_layers
=
envs
.
get_global_env
(
"hyper_parameters.fc_sizes"
)
for
size
in
hidden_layers
:
output
=
fluid
.
layers
.
fc
(
input
=
fcs
[
-
1
],
size
=
size
,
act
=
'relu'
,
param_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Normal
(
scale
=
1.0
/
math
.
sqrt
(
fcs
[
-
1
].
shape
[
1
]))))
fcs
.
append
(
output
)
```
#### Loss及Auc计算
-
预测的结果通过一个输出shape为2的FC层给出,该FC层的激活函数是softmax,会给出每条样本分属于正负样本的概率。
-
每条样本的损失由交叉熵给出,交叉熵的输入维度为[batch_size,2],数据类型为float,label的输入维度为[batch_size,1],数据类型为int。
-
该batch的损失
`avg_cost`
是各条样本的损失之和
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
全局auc:
`auc_var`
,当前batch的auc:
`batch_auc_var`
,以及auc_states:
`auc_states
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
-
我们同时还会计算预测的auc,auc的结果由
`fluid.layers.auc()`
给出,该层的返回值有三个,分别是
从第一个batch累计到当前batch的全局auc:
`auc`
,最近几个batch的auc:
`batch_auc`
,以及auc_states:
`_
`
,auc_states包含了
`batch_stat_pos, batch_stat_neg, stat_pos, stat_neg`
信息。
`batch_auc`
我们取近20个batch的平均,由参数
`slide_steps=20`
指定,roc曲线的离散化的临界数值设置为4096,由
`num_thresholds=2**12`
指定。
```
predict = fluid.layers.fc(
input=fc3,
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=inputs[-1])
auc_var, batch_auc_var, auc_states = fluid.layers.auc(
input=predict,
label=inputs[-1],
num_thresholds=2**12,
slide_steps=20)
```
input=fcs[-1],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fcs[-1].shape[1]))))
完成上述组网后,我们最终可以通过训练拿到
`avg_cost`
与
`auc`
两个重要指标。
self.predict = predict
auc, batch_auc, _ = fluid.layers.auc(input=self.predict,label=self.label_input,
num_thresholds=2**12,
slide_steps=20)
cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_mean(cost)
```
完成上述组网后,我们最终可以通过训练拿到
`BATCH_AUC`
与
`auc`
两个重要指标。
```
PaddleRec: Runner single_cpu_infer Begin
Executor Mode: infer
processor_register begin
Running SingleInstance.
Running SingleNetwork.
Running SingleInferStartup.
Running SingleInferRunner.
load persistables from increment_dnn/3
batch: 20, BATCH_AUC: [0.75670043], AUC: [0.77490453]
batch: 40, BATCH_AUC: [0.77020144], AUC: [0.77490437]
batch: 60, BATCH_AUC: [0.77464683], AUC: [0.77490435]
batch: 80, BATCH_AUC: [0.76858989], AUC: [0.77490416]
batch: 100, BATCH_AUC: [0.75728286], AUC: [0.77490362]
batch: 120, BATCH_AUC: [0.75007016], AUC: [0.77490286]
...
batch: 720, BATCH_AUC: [0.76840144], AUC: [0.77489881]
batch: 740, BATCH_AUC: [0.76659033], AUC: [0.77489854]
batch: 760, BATCH_AUC: [0.77332639], AUC: [0.77489849]
batch: 780, BATCH_AUC: [0.78361653], AUC: [0.77489874]
Infer phase2 of epoch increment_dnn/3 done, use time: 52.7707588673, global metrics: BATCH_AUC=[0.78361653], AUC=[0.77489874]
PaddleRec Finish
```
## 流式训练(OnlineLearning)任务启动及配置流程
...
...
@@ -387,5 +409,5 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
```
4.
准备好数据后, 即可按照标准的训练流程进行流式训练了
```
shell
python
-m
paddlerec.run
-m
models/r
erank/ctr-
dnn/config.yaml
python
-m
paddlerec.run
-m
models/r
ank/
dnn/config.yaml
```
models/rank/dnn/data/get_slot_data.py
浏览文件 @
6de9c8da
...
...
@@ -61,8 +61,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
s
+=
" dense_feature:"
+
str
(
i
)
for
i
in
range
(
1
,
1
+
len
(
categorical_range_
)):
s
+=
" "
+
str
(
i
)
+
":"
+
str
(
sparse_feature
[
i
-
1
][
0
])
print
(
s
.
strip
())
yield
None
print
(
s
.
strip
())
# add print for data preprocessing
return
reader
...
...
models/recall/gru4rec/README.md
0 → 100644
浏览文件 @
6de9c8da
# GRU4REC
以下是本例的简要目录结构及说明:
```
├── data #样例数据及数据处理相关文件
├── train
├── small_train.txt # 样例训练数据
├── test
├── small_test.txt # 样例测试数据
├── convert_format.py # 数据转换脚本
├── download.py # 数据下载脚本
├── preprocess.py # 数据预处理脚本
├── text2paddle.py # paddle训练数据生成脚本
├── __init__.py
├── README.md # 文档
├── model.py #模型文件
├── config.yaml #配置文件
├── data_prepare.sh #一键数据处理脚本
├── rsc15_reader.py #reader
```
注:在阅读该示例前,建议您先了解以下内容:
[
paddlerec入门教程
](
https://github.com/PaddlePaddle/PaddleRec/blob/master/README.md
)
---
## 内容
-
[
模型简介
](
#模型简介
)
-
[
数据准备
](
#数据准备
)
-
[
运行环境
](
#运行环境
)
-
[
快速开始
](
#快速开始
)
-
[
论文复现
](
#论文复现
)
-
[
进阶使用
](
#进阶使用
)
-
[
FAQ
](
#FAQ
)
## 模型简介
GRU4REC模型的介绍可以参阅论文
[
Session-based Recommendations with Recurrent Neural Networks
](
https://arxiv.org/abs/1511.06939
)
。
论文的贡献在于首次将RNN(GRU)运用于session-based推荐,相比传统的KNN和矩阵分解,效果有明显的提升。
论文的核心思想是在一个session中,用户点击一系列item的行为看做一个序列,用来训练RNN模型。预测阶段,给定已知的点击序列作为输入,预测下一个可能点击的item。
session-based推荐应用场景非常广泛,比如用户的商品浏览、新闻点击、地点签到等序列数据。
本模型配置默认使用demo数据集,若进行精度验证,请参考
[
论文复现
](
#论文复现
)
部分。
本项目支持功能
训练:单机CPU、单机单卡GPU、本地模拟参数服务器训练、增量训练,配置请参考
[
启动训练
](
https://github.com/PaddlePaddle/PaddleRec/blob/master/doc/train.md
)
预测:单机CPU、单机单卡GPU;配置请参考
[
PaddleRec 离线预测
](
https://github.com/PaddlePaddle/PaddleRec/blob/master/doc/predict.md
)
## 数据处理
本示例中数据处理共包含三步:
-
Step1: 原始数据数据集下载
```
cd data/
python download.py
```
-
Step2: 数据预处理及格式转换。
1.
以session_id为key合并原始数据集,得到每个session的日期,及顺序点击列表。
2.
过滤掉长度为1的session;过滤掉点击次数小于5的items。
3.
训练集、测试集划分。原始数据集里最新日期七天内的作为训练集,更早之前的数据作为测试集。
```
python preprocess.py
python convert_format.py
```
这一步之后,会在data/目录下得到两个文件,rsc15_train_tr_paddle.txt为原始训练文件,rsc15_test_paddle.txt为原始测试文件。格式如下所示:
```
214536502 214536500 214536506 214577561
214662742 214662742 214825110 214757390 214757407 214551617
214716935 214774687 214832672
214836765 214706482
214701242 214826623
214826835 214826715
214838855 214838855
214576500 214576500 214576500
214821275 214821275 214821371 214821371 214821371 214717089 214563337 214706462 214717436 214743335 214826837 214819762
214717867 21471786
```
-
Step3: 生成字典并整理数据路径。这一步会根据训练和测试文件生成字典和对应的paddle输入文件,并将训练文件统一放在data/all_train目录下,测试文件统一放在data/all_test目录下。
```
mkdir raw_train_data && mkdir raw_test_data
mv rsc15_train_tr_paddle.txt raw_train_data/ && mv rsc15_test_paddle.txt raw_test_data/
mkdir all_train && mkdir all_test
python text2paddle.py raw_train_data/ raw_test_data/ all_train all_test vocab.txt
```
方便起见,我们提供了一键式数据生成脚本:
```
sh data_prepare.sh
```
## 运行环境
PaddlePaddle>=1.7.2
python 2.7/3.5/3.6/3.7
PaddleRec >=0.1
os : windows/linux/macos
## 快速开始
### 单机训练
在config.yaml文件中设置好设备,epochs等。
```
runner:
- name: cpu_train_runner
class: train
device: cpu # gpu
epochs: 10
save_checkpoint_interval: 1
save_inference_interval: 1
save_checkpoint_path: "increment_gru4rec"
save_inference_path: "inference_gru4rec"
save_inference_feed_varnames: ["src_wordseq", "dst_wordseq"] # feed vars of save inference
save_inference_fetch_varnames: ["mean_0.tmp_0", "top_k_0.tmp_0"]
print_interval: 10
phases: [train]
```
### 单机预测
在config.yaml文件中设置好设备,epochs等。
```
- name: cpu_infer_runner
class: infer
init_model_path: "increment_gru4rec"
device: cpu # gpu
phases: [infer]
```
### 运行
```
python -m paddlerec.run -m paddlerec.models.recall.gru4rec
```
### 结果展示
样例数据训练结果展示:
```
Running SingleStartup.
Running SingleRunner.
2020-09-22 03:31:18,167-INFO: [Train], epoch: 0, batch: 10, time_each_interval: 4.34s, RecallCnt: [1669.], cost: [8.366313], InsCnt: [16228.], Acc(Recall@20): [0.10284693]
2020-09-22 03:31:21,982-INFO: [Train], epoch: 0, batch: 20, time_each_interval: 3.82s, RecallCnt: [3168.], cost: [8.170701], InsCnt: [31943.], Acc(Recall@20): [0.09917666]
2020-09-22 03:31:25,797-INFO: [Train], epoch: 0, batch: 30, time_each_interval: 3.81s, RecallCnt: [4855.], cost: [8.017181], InsCnt: [47892.], Acc(Recall@20): [0.10137393]
...
epoch 0 done, use time: 6003.78719687, global metrics: cost=[4.4394927], InsCnt=23622448.0 RecallCnt=14547467.0 Acc(Recall@20)=0.6158323218660487
2020-09-22 05:11:17,761-INFO: save epoch_id:0 model into: "inference_gru4rec/0"
...
epoch 9 done, use time: 6009.97707605, global metrics: cost=[4.069373], InsCnt=236237470.0 RecallCnt=162838200.0 Acc(Recall@20)=0.6892988086157644
2020-09-22 20:17:11,358-INFO: save epoch_id:9 model into: "inference_gru4rec/9"
PaddleRec Finish
```
样例数据预测结果展示:
```
Running SingleInferStartup.
Running SingleInferRunner.
load persistables from increment_gru4rec/9
2020-09-23 03:46:21,081-INFO: [Infer] batch: 20, time_each_interval: 3.68s, RecallCnt: [24875.], InsCnt: [35581.], Acc(Recall@20): [0.6991091]
Infer infer of epoch 9 done, use time: 5.25408315659, global metrics: InsCnt=52551.0 RecallCnt=36720.0 Acc(Recall@20)=0.698749785922247
...
Infer infer of epoch 0 done, use time: 5.20699501038, global metrics: InsCnt=52551.0 RecallCnt=33664.0 Acc(Recall@20)=0.6405967536298073
PaddleRec Finish
```
## 论文复现
用原论文的完整数据复现论文效果需要在config.yaml修改超参:
-
batch_size: 修改config.yaml中dataset_train数据集的batch_size为500。
-
epochs: 修改config.yaml中runner的epochs为10。
-
数据源:修改config.yaml中dataset_train数据集的data_path为"{workspace}/data/all_train",dataset_test数据集的data_path为"{workspace}/data/all_test"。
使用gpu训练10轮 测试结果为
epoch | 测试recall@20 | 速度(s)
-- | -- | --
1 | 0.6406 | 6003
2 | 0.6727 | 6007
3 | 0.6831 | 6108
4 | 0.6885 | 6025
5 | 0.6913 | 6019
6 | 0.6931 | 6011
7 | 0.6952 | 6015
8 | 0.6968 | 6076
9 | 0.6972 | 6076
10 | 0.6987| 6009
修改后运行方案:修改config.yaml中的'workspace'为config.yaml的目录位置,执行
```
python -m paddlerec.run -m /home/your/dir/config.yaml #调试模式 直接指定本地config的绝对路径
```
## 进阶使用
## FAQ
models/recall/gru4rec/config.yaml
浏览文件 @
6de9c8da
...
...
@@ -16,18 +16,19 @@ workspace: "models/recall/gru4rec"
dataset
:
-
name
:
dataset_train
batch_size
:
5
type
:
QueueDataset
batch_size
:
5
00
type
:
DataLoader
#
QueueDataset
data_path
:
"
{workspace}/data/train"
data_converter
:
"
{workspace}/rsc15_reader.py"
-
name
:
dataset_infer
batch_size
:
5
type
:
QueueDataset
batch_size
:
5
00
type
:
DataLoader
#
QueueDataset
data_path
:
"
{workspace}/data/test"
data_converter
:
"
{workspace}/rsc15_reader.py"
hyper_parameters
:
vocab_size
:
1000
recall_k
:
20
vocab_size
:
37483
hid_size
:
100
emb_lr_x
:
10.0
gru_lr_x
:
1.0
...
...
@@ -40,30 +41,34 @@ hyper_parameters:
strategy
:
async
#use infer_runner mode and modify 'phase' below if infer
mode
:
train_runner
mode
:
[
cpu_train_runner
,
cpu_infer_runner
]
#mode: infer_runner
runner
:
-
name
:
train_runner
-
name
:
cpu_
train_runner
class
:
train
device
:
cpu
epochs
:
3
save_checkpoint_interval
:
2
save_inference_interval
:
4
save_checkpoint_path
:
"
increment"
save_inference_path
:
"
inference"
epochs
:
10
save_checkpoint_interval
:
1
save_inference_interval
:
1
save_checkpoint_path
:
"
increment_gru4rec"
save_inference_path
:
"
inference_gru4rec"
save_inference_feed_varnames
:
[
"
src_wordseq"
,
"
dst_wordseq"
]
# feed vars of save inference
save_inference_fetch_varnames
:
[
"
mean_0.tmp_0"
,
"
top_k_0.tmp_0"
]
print_interval
:
10
-
name
:
infer_runner
phases
:
[
train
]
-
name
:
cpu_infer_runner
class
:
infer
init_model_path
:
"
increment
/0
"
init_model_path
:
"
increment
_gru4rec
"
device
:
cpu
phases
:
[
infer
]
phase
:
-
name
:
train
model
:
"
{workspace}/model.py"
dataset_name
:
dataset_train
thread_num
:
1
#
- name: infer
#
model: "{workspace}/model.py"
#
dataset_name: dataset_infer
#
thread_num: 1
-
name
:
infer
model
:
"
{workspace}/model.py"
dataset_name
:
dataset_infer
thread_num
:
1
models/recall/gru4rec/data/convert_format.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
sys
import
codecs
def
convert_format
(
input
,
output
):
with
codecs
.
open
(
input
,
"r"
,
encoding
=
'utf-8'
)
as
rf
:
with
codecs
.
open
(
output
,
"w"
,
encoding
=
'utf-8'
)
as
wf
:
last_sess
=
-
1
sign
=
1
i
=
0
for
l
in
rf
:
i
=
i
+
1
if
i
==
1
:
continue
if
(
i
%
1000000
==
1
):
print
(
i
)
tokens
=
l
.
strip
().
split
()
if
(
int
(
tokens
[
0
])
!=
last_sess
):
if
(
sign
):
sign
=
0
wf
.
write
(
tokens
[
1
]
+
" "
)
else
:
wf
.
write
(
"
\n
"
+
tokens
[
1
]
+
" "
)
last_sess
=
int
(
tokens
[
0
])
else
:
wf
.
write
(
tokens
[
1
]
+
" "
)
input
=
"rsc15_train_tr.txt"
output
=
"rsc15_train_tr_paddle.txt"
input2
=
"rsc15_test.txt"
output2
=
"rsc15_test_paddle.txt"
convert_format
(
input
,
output
)
convert_format
(
input2
,
output2
)
models/recall/gru4rec/data/download.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
requests
import
sys
import
time
import
os
lasttime
=
time
.
time
()
FLUSH_INTERVAL
=
0.1
def
progress
(
str
,
end
=
False
):
global
lasttime
if
end
:
str
+=
"
\n
"
lasttime
=
0
if
time
.
time
()
-
lasttime
>=
FLUSH_INTERVAL
:
sys
.
stdout
.
write
(
"
\r
%s"
%
str
)
lasttime
=
time
.
time
()
sys
.
stdout
.
flush
()
def
_download_file
(
url
,
savepath
,
print_progress
):
r
=
requests
.
get
(
url
,
stream
=
True
)
total_length
=
r
.
headers
.
get
(
'content-length'
)
if
total_length
is
None
:
with
open
(
savepath
,
'wb'
)
as
f
:
shutil
.
copyfileobj
(
r
.
raw
,
f
)
else
:
with
open
(
savepath
,
'wb'
)
as
f
:
dl
=
0
total_length
=
int
(
total_length
)
starttime
=
time
.
time
()
if
print_progress
:
print
(
"Downloading %s"
%
os
.
path
.
basename
(
savepath
))
for
data
in
r
.
iter_content
(
chunk_size
=
4096
):
dl
+=
len
(
data
)
f
.
write
(
data
)
if
print_progress
:
done
=
int
(
50
*
dl
/
total_length
)
progress
(
"[%-50s] %.2f%%"
%
(
'='
*
done
,
float
(
100
*
dl
)
/
total_length
))
if
print_progress
:
progress
(
"[%-50s] %.2f%%"
%
(
'='
*
50
,
100
),
end
=
True
)
_download_file
(
"https://paddlerec.bj.bcebos.com/gnn%2Fyoochoose-clicks.dat"
,
"./yoochoose-clicks.dat"
,
True
)
models/recall/gru4rec/data/preprocess.py
0 → 100644
浏览文件 @
6de9c8da
# -*- coding: utf-8 -*-
"""
Created on Fri Jun 25 16:20:12 2015
@author: Balázs Hidasi
"""
import
numpy
as
np
import
pandas
as
pd
import
datetime
as
dt
import
time
PATH_TO_ORIGINAL_DATA
=
'./'
PATH_TO_PROCESSED_DATA
=
'./'
data
=
pd
.
read_csv
(
PATH_TO_ORIGINAL_DATA
+
'yoochoose-clicks.dat'
,
sep
=
','
,
header
=
0
,
usecols
=
[
0
,
1
,
2
],
dtype
=
{
0
:
np
.
int32
,
1
:
str
,
2
:
np
.
int64
})
data
.
columns
=
[
'session_id'
,
'timestamp'
,
'item_id'
]
data
[
'Time'
]
=
data
.
timestamp
.
apply
(
lambda
x
:
time
.
mktime
(
dt
.
datetime
.
strptime
(
x
,
'%Y-%m-%dT%H:%M:%S.%fZ'
).
timetuple
()))
#This is not UTC. It does not really matter.
del
(
data
[
'timestamp'
])
session_lengths
=
data
.
groupby
(
'session_id'
).
size
()
data
=
data
[
np
.
in1d
(
data
.
session_id
,
session_lengths
[
session_lengths
>
1
]
.
index
)]
item_supports
=
data
.
groupby
(
'item_id'
).
size
()
data
=
data
[
np
.
in1d
(
data
.
item_id
,
item_supports
[
item_supports
>=
5
].
index
)]
session_lengths
=
data
.
groupby
(
'session_id'
).
size
()
data
=
data
[
np
.
in1d
(
data
.
session_id
,
session_lengths
[
session_lengths
>=
2
]
.
index
)]
tmax
=
data
.
Time
.
max
()
session_max_times
=
data
.
groupby
(
'session_id'
).
Time
.
max
()
session_train
=
session_max_times
[
session_max_times
<
tmax
-
86400
].
index
session_test
=
session_max_times
[
session_max_times
>=
tmax
-
86400
].
index
train
=
data
[
np
.
in1d
(
data
.
session_id
,
session_train
)]
test
=
data
[
np
.
in1d
(
data
.
session_id
,
session_test
)]
test
=
test
[
np
.
in1d
(
test
.
item_id
,
train
.
item_id
)]
tslength
=
test
.
groupby
(
'session_id'
).
size
()
test
=
test
[
np
.
in1d
(
test
.
session_id
,
tslength
[
tslength
>=
2
].
index
)]
print
(
'Full train set
\n\t
Events: {}
\n\t
Sessions: {}
\n\t
Items: {}'
.
format
(
len
(
train
),
train
.
session_id
.
nunique
(),
train
.
item_id
.
nunique
()))
train
.
to_csv
(
PATH_TO_PROCESSED_DATA
+
'rsc15_train_full.txt'
,
sep
=
'
\t
'
,
index
=
False
)
print
(
'Test set
\n\t
Events: {}
\n\t
Sessions: {}
\n\t
Items: {}'
.
format
(
len
(
test
),
test
.
session_id
.
nunique
(),
test
.
item_id
.
nunique
()))
test
.
to_csv
(
PATH_TO_PROCESSED_DATA
+
'rsc15_test.txt'
,
sep
=
'
\t
'
,
index
=
False
)
tmax
=
train
.
Time
.
max
()
session_max_times
=
train
.
groupby
(
'session_id'
).
Time
.
max
()
session_train
=
session_max_times
[
session_max_times
<
tmax
-
86400
].
index
session_valid
=
session_max_times
[
session_max_times
>=
tmax
-
86400
].
index
train_tr
=
train
[
np
.
in1d
(
train
.
session_id
,
session_train
)]
valid
=
train
[
np
.
in1d
(
train
.
session_id
,
session_valid
)]
valid
=
valid
[
np
.
in1d
(
valid
.
item_id
,
train_tr
.
item_id
)]
tslength
=
valid
.
groupby
(
'session_id'
).
size
()
valid
=
valid
[
np
.
in1d
(
valid
.
session_id
,
tslength
[
tslength
>=
2
].
index
)]
print
(
'Train set
\n\t
Events: {}
\n\t
Sessions: {}
\n\t
Items: {}'
.
format
(
len
(
train_tr
),
train_tr
.
session_id
.
nunique
(),
train_tr
.
item_id
.
nunique
()))
train_tr
.
to_csv
(
PATH_TO_PROCESSED_DATA
+
'rsc15_train_tr.txt'
,
sep
=
'
\t
'
,
index
=
False
)
print
(
'Validation set
\n\t
Events: {}
\n\t
Sessions: {}
\n\t
Items: {}'
.
format
(
len
(
valid
),
valid
.
session_id
.
nunique
(),
valid
.
item_id
.
nunique
()))
valid
.
to_csv
(
PATH_TO_PROCESSED_DATA
+
'rsc15_train_valid.txt'
,
sep
=
'
\t
'
,
index
=
False
)
models/recall/gru4rec/data/text2paddle.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
sys
import
six
import
collections
import
os
import
sys
import
io
if
six
.
PY2
:
reload
(
sys
)
sys
.
setdefaultencoding
(
'utf-8'
)
def
word_count
(
input_file
,
word_freq
=
None
):
"""
compute word count from corpus
"""
if
word_freq
is
None
:
word_freq
=
collections
.
defaultdict
(
int
)
for
l
in
input_file
:
for
w
in
l
.
strip
().
split
():
word_freq
[
w
]
+=
1
return
word_freq
def
build_dict
(
min_word_freq
=
0
,
train_dir
=
""
,
test_dir
=
""
):
"""
Build a word dictionary from the corpus, Keys of the dictionary are words,
and values are zero-based IDs of these words.
"""
word_freq
=
collections
.
defaultdict
(
int
)
files
=
os
.
listdir
(
train_dir
)
for
fi
in
files
:
with
io
.
open
(
os
.
path
.
join
(
train_dir
,
fi
),
"r"
)
as
f
:
word_freq
=
word_count
(
f
,
word_freq
)
files
=
os
.
listdir
(
test_dir
)
for
fi
in
files
:
with
io
.
open
(
os
.
path
.
join
(
test_dir
,
fi
),
"r"
)
as
f
:
word_freq
=
word_count
(
f
,
word_freq
)
word_freq
=
[
x
for
x
in
six
.
iteritems
(
word_freq
)
if
x
[
1
]
>
min_word_freq
]
word_freq_sorted
=
sorted
(
word_freq
,
key
=
lambda
x
:
(
-
x
[
1
],
x
[
0
]))
words
,
_
=
list
(
zip
(
*
word_freq_sorted
))
word_idx
=
dict
(
list
(
zip
(
words
,
six
.
moves
.
range
(
len
(
words
)))))
return
word_idx
def
write_paddle
(
word_idx
,
train_dir
,
test_dir
,
output_train_dir
,
output_test_dir
):
files
=
os
.
listdir
(
train_dir
)
if
not
os
.
path
.
exists
(
output_train_dir
):
os
.
mkdir
(
output_train_dir
)
for
fi
in
files
:
with
io
.
open
(
os
.
path
.
join
(
train_dir
,
fi
),
"r"
)
as
f
:
with
io
.
open
(
os
.
path
.
join
(
output_train_dir
,
fi
),
"w"
)
as
wf
:
for
l
in
f
:
l
=
l
.
strip
().
split
()
l
=
[
word_idx
.
get
(
w
)
for
w
in
l
]
for
w
in
l
:
wf
.
write
(
str2file
(
str
(
w
)
+
" "
))
wf
.
write
(
str2file
(
"
\n
"
))
files
=
os
.
listdir
(
test_dir
)
if
not
os
.
path
.
exists
(
output_test_dir
):
os
.
mkdir
(
output_test_dir
)
for
fi
in
files
:
with
io
.
open
(
os
.
path
.
join
(
test_dir
,
fi
),
"r"
,
encoding
=
'utf-8'
)
as
f
:
with
io
.
open
(
os
.
path
.
join
(
output_test_dir
,
fi
),
"w"
,
encoding
=
'utf-8'
)
as
wf
:
for
l
in
f
:
l
=
l
.
strip
().
split
()
l
=
[
word_idx
.
get
(
w
)
for
w
in
l
]
for
w
in
l
:
wf
.
write
(
str2file
(
str
(
w
)
+
" "
))
wf
.
write
(
str2file
(
"
\n
"
))
def
str2file
(
str
):
if
six
.
PY2
:
return
str
.
decode
(
"utf-8"
)
else
:
return
str
def
text2paddle
(
train_dir
,
test_dir
,
output_train_dir
,
output_test_dir
,
output_vocab
):
vocab
=
build_dict
(
0
,
train_dir
,
test_dir
)
print
(
"vocab size:"
,
str
(
len
(
vocab
)))
with
io
.
open
(
output_vocab
,
"w"
,
encoding
=
'utf-8'
)
as
wf
:
wf
.
write
(
str2file
(
str
(
len
(
vocab
))
+
"
\n
"
))
write_paddle
(
vocab
,
train_dir
,
test_dir
,
output_train_dir
,
output_test_dir
)
train_dir
=
sys
.
argv
[
1
]
test_dir
=
sys
.
argv
[
2
]
output_train_dir
=
sys
.
argv
[
3
]
output_test_dir
=
sys
.
argv
[
4
]
output_vocab
=
sys
.
argv
[
5
]
text2paddle
(
train_dir
,
test_dir
,
output_train_dir
,
output_test_dir
,
output_vocab
)
models/recall/gru4rec/data_prepare.sh
0 → 100644
浏览文件 @
6de9c8da
#! /bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set
-e
echo
"begin to download data"
cd
data
&&
python download.py
python preprocess.py
echo
"begin to convert data (binary -> txt)"
python convert_format.py
mkdir
raw_train_data
&&
mkdir
raw_test_data
mv
rsc15_train_tr_paddle.txt raw_train_data/
&&
mv
rsc15_test_paddle.txt raw_test_data/
mkdir
all_train
&&
mkdir
all_test
python text2paddle.py raw_train_data/ raw_test_data/ all_train all_test vocab.txt
models/recall/gru4rec/model.py
浏览文件 @
6de9c8da
...
...
@@ -16,6 +16,7 @@ import paddle.fluid as fluid
from
paddlerec.core.utils
import
envs
from
paddlerec.core.model
import
ModelBase
from
paddlerec.core.metrics
import
RecallK
class
Model
(
ModelBase
):
...
...
@@ -81,13 +82,13 @@ class Model(ModelBase):
high
=
self
.
init_high_bound
),
learning_rate
=
self
.
fc_lr_x
))
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
fc
,
label
=
dst_wordseq
)
acc
=
fluid
.
layers
.
accuracy
(
input
=
fc
,
label
=
dst_wordseq
,
k
=
self
.
recall_k
)
acc
=
RecallK
(
input
=
fc
,
label
=
dst_wordseq
,
k
=
self
.
recall_k
)
if
is_infer
:
self
.
_infer_results
[
'
recall
20'
]
=
acc
self
.
_infer_results
[
'
Recall@
20'
]
=
acc
return
avg_cost
=
fluid
.
layers
.
mean
(
x
=
cost
)
self
.
_cost
=
avg_cost
self
.
_metrics
[
"cost"
]
=
avg_cost
self
.
_metrics
[
"
acc
"
]
=
acc
self
.
_metrics
[
"
Recall@20
"
]
=
acc
models/recall/word2vec/README.md
浏览文件 @
6de9c8da
...
...
@@ -222,15 +222,18 @@ Infer phase2 of epoch 3 done, use time: 4.43099021912, global metrics: acc=[1.]
## 论文复现
1.
用原论文的完整数据复现论文效果需要在config.yaml修改超参:
```
- name: dataset_train
batch_size: 100 # 1. 修改batch_size为100
type: DataLoader
data_path: "{workspace}/data/all_train" # 2. 修改数据为全量训练数据
word_count_dict_path: "{workspace}/data/all_dict/
word_count_dict.txt" # 3. 修改词表为全量词表
word_count_dict_path: "{workspace}/data/all_dict/word_count_dict.txt" # 3. 修改词表为全量词表
data_converter: "{workspace}/w2v_reader.py"
- name: dataset_infer
data_path: "{workspace}/data/all_test" # 4. 修改数据为全量测试数据
word_id_dict_path: "{workspace}/data/all_dict/word_id_dict.txt" # 5. 修改词表为全量词表
-
name: single_cpu_train
-
epochs: # 4. 修改config.yaml中runner的epochs为5。
```
修改后运行方案:修改config.yaml中的'workspace'为config.yaml的目录位置,执行
```
...
...
@@ -239,7 +242,7 @@ python -m paddlerec.run -m /home/your/dir/config.yaml #调试模式 直接指定
2.
使用自定义预测程序预测全量测试集:
```
python infer.py --test_dir ./data/all_test --dict_path ./data/all_dict/word_id_dict.txt --batch_size
20000 --model_dir ./increment_w2v/ --start_index 0 --last_index 5
--emb_size 300
python infer.py --test_dir ./data/all_test --dict_path ./data/all_dict/word_id_dict.txt --batch_size
10000 --model_dir ./increment_w2v/ --start_index 0 --last_index 4
--emb_size 300
```
结论:使用cpu训练5轮,自定义预测准确率为0.540,每轮训练时间7小时左右。
...
...
models/recall/youtube_dnn/README.md
浏览文件 @
6de9c8da
...
...
@@ -8,7 +8,7 @@
├── data.txt
├── test
├── data.txt
├── generate_ramdom_data # 随机训练数据生成文件
├── generate_ramdom_data
.py
# 随机训练数据生成文件
├── __init__.py
├── README.md # 文档
├── model.py #模型文件
...
...
@@ -107,7 +107,7 @@ python infer.py --use_gpu 1 --test_epoch 19 --inference_model_dir ./inference_yo
```
### 运行
```
python -m paddlerec.run -m paddlerec.models.recall.
w2v
python -m paddlerec.run -m paddlerec.models.recall.
youtube_dnn
```
### 结果展示
...
...
models/treebased/tdm/README.md
浏览文件 @
6de9c8da
...
...
@@ -13,6 +13,7 @@ cd paddle-rec
python
-m
paddlerec.run
-m
models/treebased/tdm/config.yaml
```
3.
建树及自定义训练的细节可以查阅
[
TDM-Demo建树及训练
](
./gen_tree/README.md
)
## 树结构的准备
### 名词概念
...
...
models/treebased/tdm/build_tree.md
已删除
100644 → 0
浏览文件 @
2cf8c6f0
wget https://paddlerec.bj.bcebos.com/utils/tree_build_utils.tar.gz --no-check-certificate
# input_path: embedding的路径
# emb_shape: embedding中key-value,value的维度
# emb格式要求: embedding_id(int64),embedding(float),embedding(float),......,embedding(float)
# cluster_threads: 建树聚类所用线程
python_172_anytree/bin/python -u main.py --input_path=./gen_emb/item_emb.txt --output_path=./ --emb_shape=24 --cluster_threads=4
建树流程是:1、读取emb -> 2、kmeans聚类 -> 3、聚类结果整理为树 -> 4、基于树结构得到模型所需的4个文件
1 Layer_list:记录了每一层都有哪些节点。训练用
2 Travel_list:记录每个叶子节点的Travel路径。训练用
3 Tree_Info:记录了每个节点的信息,主要为:是否是item/item_id,所在层级,父节点,子节点。检索用
4 Tree_Embedding:记录所有节点的Embedding。训练及检索用
注意一下训练数据输入的item是建树之前用的item id,还是基于树的node id,还是基于叶子的leaf id,在tdm_reader.py中,可以加载字典,做映射。
用厂内版建树得到的输出文件夹里,有名为id2nodeid.txt的映射文件,格式是『hash值』+ 『树节点ID』+『叶子节点ID(表示第几个叶子节点,tdm_sampler op 所需的输入)』
在另一个id2bidword.txt中,也有映射关系,格式是『hash值』+『原始item ID』,这个文件中仅存储了叶子节点的信息。
models/treebased/tdm/config.yaml
浏览文件 @
6de9c8da
...
...
@@ -59,49 +59,39 @@ hyper_parameters:
tree_emb_path
:
"
{workspace}/tree/tree_emb.npy"
# select runner by name
mode
:
runner1
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
mode
:
[
runner1
]
runner
:
-
name
:
runner1
class
:
train
startup_class_path
:
"
{workspace}/tdm_startup.py"
# num of epochs
epochs
:
10
# device to run training or infer
device
:
cpu
save_checkpoint_interval
:
2
# save model interval of epochs
save_inference_interval
:
4
# save inference
save_checkpoint_path
:
"
increment"
# save checkpoint path
save_inference_path
:
"
inference"
# save inference path
save_inference_feed_varnames
:
[]
# feed vars of save inference
save_inference_fetch_varnames
:
[]
# fetch vars of save inference
init_model_path
:
"
"
# load model path
print_interval
:
10
phases
:
[
phase1
]
-
name
:
runner2
class
:
infer
startup_class_path
:
"
{workspace}/tdm_startup.py"
# device to run training or infer
device
:
cpu
init_model_path
:
"
increment/0"
# load model path
print_interval
:
1
phases
:
[
phase2
]
-
name
:
runner3
class
:
local_cluster_train
startup_class_path
:
"
{workspace}/tdm_startup.py"
fleet_mode
:
ps
epochs
:
10
# device to run training or infer
device
:
cpu
save_checkpoint_interval
:
2
# save model interval of epochs
save_inference_interval
:
4
# save inference
save_checkpoint_path
:
"
increment"
# save checkpoint path
save_inference_path
:
"
inference"
# save inference path
save_inference_feed_varnames
:
[]
# feed vars of save inference
save_inference_fetch_varnames
:
[]
# fetch vars of save inference
init_model_path
:
"
init_model"
# load model path
print_interval
:
10
phases
:
[
phase1
]
# runner will run all the phase in each epoch
phase
:
...
...
@@ -109,7 +99,7 @@ phase:
model
:
"
{workspace}/model.py"
# user-defined model
dataset_name
:
dataset_train
# select dataset by name
thread_num
:
1
#
- name: phase2
#
model: "{workspace}/model.py"
#
dataset_name: dataset_infer
#
thread_num: 2
-
name
:
phase2
model
:
"
{workspace}/model.py"
dataset_name
:
dataset_infer
thread_num
:
2
models/treebased/tdm/gen_tree/README.md
0 → 100644
浏览文件 @
6de9c8da
# TDM-Demo建树及训练
## 建树所需环境
Requirements:
-
python >= 2.7
-
paddlepaddle >= 1.7.2(建议1.7.2)
-
paddle-rec (克隆github paddlerec,执行python setup.py install)
-
sklearn
-
anytree
## 建树流程
### 生成建树所需Embedding
-
生成Fake的emb
```
shell
cd
gen_tree
python
-u
emb_util.py
```
生成的emb维度是[13, 64],含义是共有13个item,每个item的embedding维度是64,生成的item_emb位于
`gen_tree/item_emb.txt`
格式为
`emb_value_0(float) 空格 emb_value_1(float) ... emb_value_63(float) \t item_id `
在demo中,要求item的编号从0开始,范围 [0, item_nums-1]
真实场景可以通过各种hash映射满足该要求
### 对Item_embedding进行聚类建树
执行
```
shell
cd
gen_tree
# emd_path: item_emb的地址
# emb_size: item_emb的第二个维度,即每个item的emb的size(示例中为64)
# threads: 多线程建树配置的线程数
# n_clusters: 最终建树为几叉树,此处设置为2叉树
python gen_tree.py
--emd_path
item_emb.txt
--emb_size
64
--output_dir
./output
--threads
1
--n_clusters
2
```
生成的训练所需树结构文件位于
`gen_tree/output`
```
shell
.
├── id2item.json
# 树节点id到item id的映射表
├── layer_list.txt
# 树的每个层级都有哪些节点
├── travel_list.npy
# 每个item从根到叶子的遍历路径,按item顺序排序
├── travel_list.txt
# 上个文件的明文txt
├── tree_embedding.txt
# 所有节点按节点id排列组成的embedding
├── tree_emb.npy
# 上个文件的.npy版本
├── tree_info.npy
# 每个节点:是否对应item/父/层级/子节点,按节点顺序排列
├── tree_info.txt
# 上个文件的明文txt
└── tree.pkl
# 聚类得到的树结构
```
我们最终需要使用建树生成的以下四个文件,参与网络训练,参考
`models/treebased/tdm/config.yaml`
1.
layer_list.txt
2.
travel_list.npy
3.
tree_info.npy
4.
tree_emb.npy
### 执行训练
-
更改
`config.yaml`
中的配置
首先更改
```
yaml
hyper_parameters
:
# ...
tree
:
# 单机训练建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
# 分布式训练trainer需要独立load
# 预测时也改为从paddle模型加载
load_tree_from_numpy
:
True
# only once
load_paddle_model
:
False
# train & infer need, after load from npy, change it to True
tree_layer_path
:
"
{workspace}/tree/layer_list.txt"
tree_travel_path
:
"
{workspace}/tree/travel_list.npy"
tree_info_path
:
"
{workspace}/tree/tree_info.npy"
tree_emb_path
:
"
{workspace}/tree/tree_emb.npy"
```
将上述几个path改为建树得到的文件所在的地址
再更改
```
yaml
hyper_parameters
:
max_layers
:
4
# 不含根节点,树的层数
node_nums
:
26
# 树共有多少个节点,数量与tree_info文件的行数相等
leaf_node_nums
:
13
# 树共有多少个叶子节点
layer_node_num_list
:
[
2
,
4
,
8
,
10
]
# 树的每层有多少个节点
child_nums
:
2
# 每个节点最多有几个孩子结点(几叉树)
neg_sampling_list
:
[
1
,
2
,
3
,
4
]
# 在树的每层做多少负采样,训练自定义的参数
```
若并不知道对上面几个参数具体值,可以试运行一下,paddlerec读取建树生成的文件后,会将具体信息打印到屏幕上,如下所示:
```
shell
...
File_list:
[
'models/treebased/tdm/data/train/demo_fake_input.txt'
]
2020-09-10 15:17:19,259 - INFO - Run TDM Trainer Startup Pass
2020-09-10 15:17:19,283 - INFO - load tree from numpy
2020-09-10 15:17:19,284 - INFO - TDM Tree leaf node nums: 13
2020-09-10 15:17:19,284 - INFO - TDM Tree max layer: 4
2020-09-10 15:17:19,284 - INFO - TDM Tree layer_node_num_list:
[
2, 4, 8, 10]
2020-09-10 15:17:19,285 - INFO - Begin Save Init model.
2020-09-10 15:17:19,394 - INFO - End Save Init model.
Running SingleRunner.
...
```
将其抄到配置中即可
-
训练
执行
```
cd /PaddleRec # PaddleRec 克隆的根目录
python -m paddlerec.run -m models/treebased/tdm/config.yaml
```
models/treebased/tdm/gen_tree/__init__.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
.
import
cluster
__all__
=
[]
__all__
+=
cluster
.
__all__
models/treebased/tdm/gen_tree/cluster.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (C) 2016-2018 Alibaba Group Holding Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
codecs
import
os
import
time
import
collections
import
argparse
import
multiprocessing
as
mp
import
numpy
as
np
from
sklearn.cluster
import
KMeans
import
tree_builder
__all__
=
[
'Cluster'
]
class
Cluster
:
def
__init__
(
self
,
filename
,
emb_size
,
id_offset
=
None
,
parall
=
16
,
prev_result
=
None
,
output_dir
=
'./'
,
_n_clusters
=
2
):
self
.
filename
=
filename
self
.
emb_size
=
emb_size
self
.
mini_batch
=
256
self
.
ids
=
None
self
.
data
=
None
self
.
items
=
None
self
.
parall
=
parall
self
.
queue
=
None
self
.
timeout
=
5
self
.
id_offset
=
id_offset
self
.
codes
=
None
self
.
prev_result
=
prev_result
self
.
output_dir
=
output_dir
self
.
n_clusters
=
_n_clusters
def
_read
(
self
):
t1
=
time
.
time
()
ids
=
list
()
data
=
list
()
items
=
list
()
count
=
0
with
codecs
.
open
(
self
.
filename
,
'r'
,
encoding
=
'utf-8'
)
as
f
:
for
line
in
f
:
arr
=
line
.
rstrip
().
split
(
'
\t
'
)
if
not
arr
:
break
elif
len
(
arr
)
==
1
:
label
=
arr
[
0
]
emb_vec
=
(
np
.
random
.
random_sample
(
(
self
.
emb_size
,
))).
tolist
()
elif
len
(
arr
)
==
2
:
label
=
arr
[
1
]
emb_vec
=
arr
[
0
].
split
()
if
len
(
emb_vec
)
!=
self
.
emb_size
:
continue
if
label
in
items
:
index
=
items
.
index
(
label
)
for
i
in
range
(
0
,
len
(
emb_vec
)):
data
[
index
][
i
+
1
]
+=
float
(
emb_vec
[
i
])
data
[
index
][
0
]
+=
1
else
:
items
.
append
(
label
)
ids
.
append
(
count
)
count
+=
1
vector
=
list
()
vector
.
append
(
1
)
for
i
in
range
(
0
,
len
(
emb_vec
)):
vector
.
append
(
float
(
emb_vec
[
i
]))
data
.
append
(
vector
)
for
i
in
range
(
len
(
data
)):
data_len
=
len
(
data
[
0
])
for
j
in
range
(
1
,
data_len
):
data
[
i
][
j
]
/=
data
[
i
][
0
]
data
[
i
]
=
data
[
i
][
1
:]
self
.
ids
=
np
.
array
(
ids
)
self
.
data
=
np
.
array
(
data
)
self
.
items
=
np
.
array
(
items
)
t2
=
time
.
time
()
print
(
"Read data done, {} records read, elapsed: {}"
.
format
(
len
(
ids
),
t2
-
t1
))
def
train
(
self
):
''' Cluster data '''
self
.
_read
()
queue
=
mp
.
Queue
()
self
.
process_prev_result
(
queue
)
processes
=
[]
pipes
=
[]
for
_
in
range
(
self
.
parall
):
a
,
b
=
mp
.
Pipe
()
p
=
mp
.
Process
(
target
=
self
.
_train
,
args
=
(
b
,
queue
))
processes
.
append
(
p
)
pipes
.
append
(
a
)
p
.
start
()
self
.
codes
=
np
.
zeros
((
len
(
self
.
ids
),
),
dtype
=
np
.
int64
)
for
pipe
in
pipes
:
codes
=
pipe
.
recv
()
for
i
in
range
(
len
(
codes
)):
if
codes
[
i
]
>
0
:
self
.
codes
[
i
]
=
codes
[
i
]
for
p
in
processes
:
p
.
join
()
assert
(
queue
.
empty
())
builder
=
tree_builder
.
TreeBuilder
(
self
.
output_dir
,
self
.
n_clusters
)
builder
.
build
(
self
.
ids
,
self
.
codes
,
items
=
self
.
items
,
data
=
self
.
data
)
def
process_prev_result
(
self
,
queue
):
if
not
self
.
prev_result
:
queue
.
put
((
0
,
np
.
array
(
range
(
len
(
self
.
ids
)))))
return
True
di
=
dict
()
for
i
,
node_id
in
enumerate
(
self
.
ids
):
di
[
node_id
]
=
i
indexes
=
[]
clusters
=
[]
with
open
(
self
.
prev_result
)
as
f
:
for
line
in
f
:
arr
=
line
.
split
(
","
)
if
arr
<
2
:
break
ni
=
[
di
[
int
(
m
)]
for
m
in
arr
]
clusters
.
append
(
ni
)
indexes
+=
ni
assert
len
(
set
(
indexes
))
==
len
(
self
.
ids
),
\
"ids count: {}, index count: {}"
.
format
(
len
(
self
.
ids
),
len
(
set
(
indexes
)))
count
=
len
(
clusters
)
assert
(
count
&
(
count
-
1
))
==
0
,
\
"Prev cluster count: {}"
.
format
(
count
)
for
i
,
ni
in
enumerate
(
clusters
):
queue
.
put
((
i
+
count
-
1
,
np
.
array
(
ni
)))
return
True
def
_train
(
self
,
pipe
,
queue
):
last_size
=
-
1
catch_time
=
0
processed
=
False
code
=
np
.
zeros
((
len
(
self
.
ids
),
),
dtype
=
np
.
int64
)
while
True
:
for
_
in
range
(
3
):
try
:
pcode
,
index
=
queue
.
get
(
timeout
=
self
.
timeout
)
except
:
index
=
None
if
index
is
not
None
:
break
if
index
is
None
:
if
processed
and
(
last_size
<=
self
.
mini_batch
or
catch_time
>=
3
):
print
(
"Process {} exits"
.
format
(
os
.
getpid
()))
break
else
:
print
(
"Got empty job, pid: {}, time: {}"
.
format
(
os
.
getpid
(
),
catch_time
))
catch_time
+=
1
continue
processed
=
True
catch_time
=
0
last_size
=
len
(
index
)
if
last_size
<=
self
.
mini_batch
:
self
.
_minbatch
(
pcode
,
index
,
code
)
else
:
start
=
time
.
time
()
sub_index
=
self
.
_cluster
(
index
)
if
last_size
>
self
.
mini_batch
:
print
(
"Train iteration done, pcode:{}, "
"data size: {}, elapsed time: {}"
.
format
(
pcode
,
len
(
index
),
time
.
time
()
-
start
))
self
.
timeout
=
int
(
0.4
*
self
.
timeout
+
0.6
*
(
time
.
time
()
-
start
))
if
self
.
timeout
<
5
:
self
.
timeout
=
5
for
i
in
range
(
self
.
n_clusters
):
if
len
(
sub_index
[
i
])
>
1
:
queue
.
put
(
(
self
.
n_clusters
*
pcode
+
i
+
1
,
sub_index
[
i
]))
process_count
=
0
for
c
in
code
:
if
c
>
0
:
process_count
+=
1
print
(
"Process {} process {} items"
.
format
(
os
.
getpid
(),
process_count
))
pipe
.
send
(
code
)
def
_minbatch
(
self
,
pcode
,
index
,
code
):
dq
=
collections
.
deque
()
dq
.
append
((
pcode
,
index
))
batch_size
=
len
(
index
)
tstart
=
time
.
time
()
while
dq
:
pcode
,
index
=
dq
.
popleft
()
if
len
(
index
)
<=
self
.
n_clusters
:
for
i
in
range
(
len
(
index
)):
code
[
index
[
i
]]
=
self
.
n_clusters
*
pcode
+
i
+
1
continue
sub_index
=
self
.
_cluster
(
index
)
for
i
in
range
(
self
.
n_clusters
):
if
len
(
sub_index
[
i
])
>
1
:
dq
.
append
((
self
.
n_clusters
*
pcode
+
i
+
1
,
sub_index
[
i
]))
elif
len
(
sub_index
[
i
])
>
0
:
for
j
in
range
(
len
(
sub_index
[
i
])):
code
[
sub_index
[
i
][
j
]]
=
self
.
n_clusters
*
\
pcode
+
i
+
j
+
1
print
(
"Minbatch, batch size: {}, elapsed: {}"
.
format
(
batch_size
,
time
.
time
()
-
tstart
))
def
_cluster
(
self
,
index
):
data
=
self
.
data
[
index
]
kmeans
=
KMeans
(
n_clusters
=
self
.
n_clusters
,
random_state
=
0
).
fit
(
data
)
labels
=
kmeans
.
labels_
sub_indexes
=
[]
remain_index
=
[]
ave_num
=
len
(
index
)
/
self
.
n_clusters
for
i
in
range
(
self
.
n_clusters
):
sub_i
=
np
.
where
(
labels
==
i
)[
0
]
sub_index
=
index
[
sub_i
]
if
len
(
sub_index
)
<=
ave_num
:
sub_indexes
.
append
(
sub_index
)
else
:
distances
=
kmeans
.
transform
(
data
[
sub_i
])[:,
i
]
sorted_index
=
sub_index
[
np
.
argsort
(
distances
)]
sub_indexes
.
append
(
sorted_index
[:
ave_num
])
remain_index
.
extend
(
list
(
sorted_index
[
ave_num
:]))
idx
=
0
while
idx
<
self
.
n_clusters
and
len
(
remain_index
)
>
0
:
if
len
(
sub_indexes
[
idx
])
>=
ave_num
:
idx
+=
1
else
:
diff
=
min
(
len
(
remain_index
),
ave_num
-
len
(
sub_indexes
[
idx
]))
sub_indexes
[
idx
]
=
np
.
append
(
sub_indexes
[
idx
],
np
.
array
(
remain_index
[
0
:
diff
]))
remain_index
=
remain_index
[
diff
:]
idx
+=
1
if
len
(
remain_index
)
>
0
:
sub_indexes
[
0
]
=
np
.
append
(
sub_indexes
[
0
],
np
.
array
(
remain_index
))
return
sub_indexes
def
_cluster1
(
self
,
index
):
pass
def
_rebalance
(
self
,
lindex
,
rindex
,
distances
):
sorted_index
=
rindex
[
np
.
argsort
(
distances
)]
idx
=
np
.
concatenate
((
lindex
,
sorted_index
))
mid
=
int
(
len
(
idx
)
/
2
)
return
idx
[
mid
:],
idx
[:
mid
]
if
__name__
==
"__main__"
:
parser
=
argparse
.
ArgumentParser
(
description
=
"Tree cluster"
)
parser
.
add_argument
(
"--embed_file"
,
required
=
True
,
help
=
"filename of the embedded vector file"
)
parser
.
add_argument
(
"--emb_size"
,
type
=
int
,
default
=
64
,
help
=
"dimension of input embedded vector"
)
parser
.
add_argument
(
"--id_offset"
,
default
=
None
,
help
=
"id offset of the generated tree internal node"
)
parser
.
add_argument
(
"--parall"
,
type
=
int
,
default
=
16
,
help
=
"Parall execution process number"
)
parser
.
add_argument
(
"--prev_result"
,
default
=
None
,
help
=
"filename of the previous cluster reuslt"
)
argments
=
parser
.
parse_args
()
t1
=
time
.
time
()
cluster
=
Cluster
(
argments
.
embed_file
,
argments
.
emb_size
,
argments
.
id_offset
,
argments
.
parall
,
argments
.
prev_result
)
cluster
.
train
()
t2
=
time
.
time
()
print
(
"Train complete successfully, elapsed: {}"
.
format
(
t2
-
t1
))
models/treebased/tdm/gen_tree/emb_util.py
0 → 100644
浏览文件 @
6de9c8da
# -*- coding=utf8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
os
import
paddle
import
paddle.fluid
as
fluid
import
numpy
as
np
import
json
import
argparse
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--mode"
,
default
=
"create_fake_emb"
,
choices
=
[
"create_fake_emb"
,
"save_item_emb"
],
type
=
str
,
help
=
"."
)
parser
.
add_argument
(
"--emb_id_nums"
,
default
=
13
,
type
=
int
,
help
=
"."
)
parser
.
add_argument
(
"--emb_shape"
,
default
=
64
,
type
=
int
,
help
=
"."
)
parser
.
add_argument
(
"--emb_path"
,
default
=
'./item_emb.txt'
,
type
=
str
,
help
=
'.'
)
args
=
parser
.
parse_args
()
def
create_fake_emb
(
emb_id_nums
,
emb_shape
,
emb_path
):
x
=
fluid
.
data
(
name
=
"item"
,
shape
=
[
1
],
lod_level
=
1
,
dtype
=
"int64"
)
# use layers.embedding to init emb value
item_emb
=
fluid
.
layers
.
embedding
(
input
=
x
,
is_sparse
=
True
,
size
=
[
emb_id_nums
,
emb_shape
],
param_attr
=
fluid
.
ParamAttr
(
name
=
"Item_Emb"
,
initializer
=
fluid
.
initializer
.
TruncatedNormal
(
loc
=
0.0
,
scale
=
2.0
)))
# run startup to init emb tensor
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
exe
.
run
(
fluid
.
default_startup_program
())
# get np.array(emb_tensor)
print
(
"Get Emb"
)
item_emb_array
=
np
.
array
(
fluid
.
global_scope
().
find_var
(
"Item_Emb"
)
.
get_tensor
())
with
open
(
emb_path
,
'w+'
)
as
f
:
emb_str
=
""
for
index
,
value
in
enumerate
(
item_emb_array
):
line
=
[]
for
v
in
value
:
line
.
append
(
str
(
v
))
line_str
=
" "
.
join
(
line
)
line_str
+=
"
\t
"
line_str
+=
str
(
index
)
line_str
+=
"
\n
"
emb_str
+=
line_str
f
.
write
(
emb_str
)
print
(
"Item Emb write Finish"
)
if
__name__
==
"__main__"
:
create_fake_emb
(
args
.
emb_id_nums
,
args
.
emb_shape
,
args
.
emb_path
)
models/treebased/tdm/gen_tree/gen_tree.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
os
import
argparse
from
cluster
import
Cluster
import
time
import
argparse
from
tree_search_util
import
tree_search_main
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
"--emd_path"
,
default
=
''
,
type
=
str
,
help
=
"."
)
parser
.
add_argument
(
"--emb_size"
,
default
=
64
,
type
=
int
,
help
=
"."
)
parser
.
add_argument
(
"--threads"
,
default
=
1
,
type
=
int
,
help
=
"."
)
parser
.
add_argument
(
"--n_clusters"
,
default
=
3
,
type
=
int
,
help
=
"."
)
parser
.
add_argument
(
"--output_dir"
,
default
=
''
,
type
=
str
,
help
=
'.'
)
args
=
parser
.
parse_args
()
def
main
():
cur_time
=
time
.
strftime
(
'%Y-%m-%d %H:%M:%S'
,
time
.
localtime
(
time
.
time
()))
if
not
os
.
path
.
exists
(
args
.
output_dir
):
os
.
system
(
"mkdir -p "
+
args
.
output_dir
)
print
(
'%s start build tree'
%
cur_time
)
# 1. Tree clustering, generating two files in current directory, tree.pkl, id2item.json
cluster
=
Cluster
(
args
.
emd_path
,
args
.
emb_size
,
parall
=
args
.
threads
,
output_dir
=
args
.
output_dir
,
_n_clusters
=
args
.
n_clusters
)
cluster
.
train
()
# 2. Tree searching, generating tree_info, travel_list, layer_list for train process.
tree_search_main
(
os
.
path
.
join
(
args
.
output_dir
,
"tree.pkl"
),
os
.
path
.
join
(
args
.
output_dir
,
"id2item.json"
),
args
.
output_dir
,
args
.
n_clusters
)
if
__name__
==
"__main__"
:
main
()
models/treebased/tdm/gen_tree/tree_builder.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (C) 2016-2018 Alibaba Group Holding Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
numpy
as
np
import
sys
import
os
import
codecs
from
tree_impl
import
_build
_CUR_DIR
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
sys
.
path
.
append
(
os
.
path
.
join
(
_CUR_DIR
,
".."
))
class
TreeBuilder
:
def
__init__
(
self
,
output_dir
=
'./'
,
n_clusters
=
2
):
self
.
output_dir
=
output_dir
self
.
n_clusters
=
n_clusters
def
build
(
self
,
ids
,
codes
,
data
=
None
,
items
=
None
,
id_offset
=
None
,
):
_build
(
ids
,
codes
,
data
,
items
,
self
.
output_dir
,
self
.
n_clusters
)
def
_ancessors
(
self
,
code
):
ancs
=
[]
while
code
>
0
:
code
=
int
((
code
-
1
)
/
2
)
ancs
.
append
(
code
)
return
ancs
models/treebased/tdm/gen_tree/tree_impl.py
0 → 100644
浏览文件 @
6de9c8da
# Copyright (C) 2016-2018 Alibaba Group Holding Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
anytree
import
NodeMixin
,
RenderTree
import
numpy
as
np
from
anytree.exporter.dictexporter
import
DictExporter
import
pickle
import
json
import
os
import
time
class
BaseClass
(
object
):
pass
class
TDMTreeClass
(
BaseClass
,
NodeMixin
):
def
__init__
(
self
,
key_code
,
emb_vec
,
ids
=
None
,
text
=
None
,
parent
=
None
,
children
=
None
):
super
(
TDMTreeClass
,
self
).
__init__
()
self
.
key_code
=
key_code
self
.
ids
=
ids
self
.
emb_vec
=
emb_vec
self
.
text
=
text
self
.
parent
=
parent
if
children
:
self
.
children
=
children
def
set_parent
(
self
,
parent
):
self
.
parent
=
parent
def
set_children
(
self
,
children
):
self
.
children
=
children
def
_build
(
ids
,
codes
,
data
,
items
,
output_dir
,
n_clusters
=
2
):
code_list
=
[
0
]
*
50000000
node_dict
=
{}
max_code
=
0
id2item
=
{}
curtime
=
time
.
strftime
(
'%Y-%m-%d %H:%M:%S'
,
time
.
localtime
(
time
.
time
()))
print
(
'%s start gen code_list'
%
curtime
)
for
_id
,
code
,
datum
,
item
in
zip
(
ids
,
codes
,
data
,
items
):
code_list
[
code
]
=
[
datum
,
_id
]
id2item
[
str
(
_id
)]
=
item
max_code
=
max
(
code
,
max_code
)
ancessors
=
_ancessors
(
code
,
n_clusters
)
for
ancessor
in
ancessors
:
code_list
[
ancessor
]
=
[[]]
for
code
in
range
(
max_code
,
-
1
,
-
1
):
if
code_list
[
code
]
==
0
:
continue
if
len
(
code_list
[
code
])
>
1
:
pass
elif
len
(
code_list
[
code
])
==
1
:
code_list
[
code
][
0
]
=
np
.
mean
(
code_list
[
code
][
0
],
axis
=
0
)
if
code
>
0
:
ancessor
=
int
((
code
-
1
)
/
n_clusters
)
code_list
[
ancessor
][
0
].
append
(
code_list
[
code
][
0
])
print
(
'start gen node_dict'
)
for
code
in
range
(
0
,
max_code
+
1
):
if
code_list
[
code
]
==
0
:
continue
if
len
(
code_list
[
code
])
>
1
:
[
datum
,
_id
]
=
code_list
[
code
]
node_dict
[
code
]
=
TDMTreeClass
(
code
,
emb_vec
=
datum
,
ids
=
_id
)
elif
len
(
code_list
[
code
])
==
1
:
[
datum
]
=
code_list
[
code
]
node_dict
[
code
]
=
TDMTreeClass
(
code
,
emb_vec
=
datum
)
if
code
>
0
:
ancessor
=
int
((
code
-
1
)
/
n_clusters
)
node_dict
[
code
].
set_parent
(
node_dict
[
ancessor
])
save_tree
(
node_dict
[
0
],
os
.
path
.
join
(
output_dir
,
'tree.pkl'
))
save_dict
(
id2item
,
os
.
path
.
join
(
output_dir
,
'id2item.json'
))
def
render
(
root
):
for
row
in
RenderTree
(
root
,
childiter
=
reversed
):
print
(
"%s%s"
%
(
row
.
pre
,
row
.
node
.
text
))
def
save_tree
(
root
,
path
):
print
(
'save tree to %s'
%
path
)
exporter
=
DictExporter
()
data
=
exporter
.
export
(
root
)
f
=
open
(
path
,
'wb'
)
pickle
.
dump
(
data
,
f
)
f
.
close
()
def
save_dict
(
dic
,
filename
):
"""save dict into json file"""
print
(
'save dict to %s'
%
filename
)
with
open
(
filename
,
"w"
)
as
json_file
:
json
.
dump
(
dic
,
json_file
,
ensure_ascii
=
False
)
def
_ancessors
(
code
,
n_clusters
):
ancs
=
[]
while
code
>
0
:
code
=
int
((
code
-
1
)
/
n_clusters
)
ancs
.
append
(
code
)
return
ancs
models/treebased/tdm/gen_tree/tree_search_util.py
0 → 100644
浏览文件 @
6de9c8da
# -*- coding=utf8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
json
import
pickle
import
time
import
os
import
numpy
as
np
from
anytree
import
(
AsciiStyle
,
LevelOrderGroupIter
,
LevelOrderIter
,
Node
,
NodeMixin
,
RenderTree
)
from
anytree.importer.dictimporter
import
DictImporter
from
anytree.iterators.abstractiter
import
AbstractIter
from
anytree.walker
import
Walker
from
tree_impl
import
TDMTreeClass
class
myLevelOrderIter
(
AbstractIter
):
@
staticmethod
def
_iter
(
children
,
filter_
,
stop
,
maxlevel
):
level
=
1
while
children
:
next_children
=
[]
for
child
in
children
:
if
filter_
(
child
):
yield
child
,
level
next_children
+=
AbstractIter
.
_get_children
(
child
.
children
,
stop
)
children
=
next_children
level
+=
1
if
AbstractIter
.
_abort_at_level
(
level
,
maxlevel
):
break
class
Tree_search
(
object
):
def
__init__
(
self
,
tree_path
,
id2item_path
,
child_num
=
2
):
self
.
root
=
None
self
.
id2item
=
None
self
.
item2id
=
None
self
.
child_num
=
child_num
self
.
load
(
tree_path
)
# self.load_id2item(id2item_path)
self
.
level_code
=
[[]]
self
.
max_level
=
0
self
.
keycode_id_dict
=
{}
# embedding
self
.
keycode_nodeid_dict
=
{}
self
.
tree_info
=
[]
self
.
id_node_dict
=
{}
self
.
get_keycode_mapping
()
self
.
travel_tree
()
self
.
get_children
()
def
get_keycode_mapping
(
self
):
nodeid
=
0
self
.
embedding
=
[]
print
(
"Begin Keycode Mapping"
)
for
node
in
myLevelOrderIter
(
self
.
root
):
node
,
level
=
node
if
level
-
1
>
self
.
max_level
:
self
.
max_level
=
level
-
1
self
.
level_code
.
append
([])
if
node
.
ids
is
not
None
:
self
.
keycode_id_dict
[
node
.
key_code
]
=
node
.
ids
self
.
id_node_dict
[
node
.
ids
]
=
node
self
.
keycode_nodeid_dict
[
node
.
key_code
]
=
nodeid
self
.
level_code
[
self
.
max_level
].
append
(
nodeid
)
node_infos
=
[]
if
node
.
ids
is
not
None
:
# item_id
node_infos
.
append
(
node
.
ids
)
else
:
node_infos
.
append
(
0
)
node_infos
.
append
(
self
.
max_level
)
# layer_id
if
node
.
parent
:
# ancestor_id
node_infos
.
append
(
self
.
keycode_nodeid_dict
[
node
.
parent
.
key_code
])
else
:
node_infos
.
append
(
0
)
self
.
tree_info
.
append
(
node_infos
)
self
.
embedding
.
append
(
node
.
emb_vec
)
nodeid
+=
1
if
nodeid
%
1000
==
0
:
print
(
"travel node id {}"
.
format
(
nodeid
))
def
load
(
self
,
path
):
print
(
"Begin Load Tree"
)
f
=
open
(
path
,
"rb"
)
data
=
pickle
.
load
(
f
)
pickle
.
dump
(
data
,
open
(
path
,
"wb"
),
protocol
=
2
)
importer
=
DictImporter
()
self
.
root
=
importer
.
import_
(
data
)
f
.
close
()
def
load_id2item
(
self
,
path
):
"""load dict from json file"""
with
open
(
path
,
"rb"
)
as
json_file
:
self
.
id2item
=
json
.
load
(
json_file
)
self
.
item2id
=
{
value
:
int
(
key
)
for
key
,
value
in
self
.
id2item
.
items
()}
def
get_children
(
self
):
"""get every node children info"""
print
(
"Begin Keycode Mapping"
)
for
node
in
myLevelOrderIter
(
self
.
root
):
node
,
level
=
node
node_id
=
self
.
keycode_nodeid_dict
[
node
.
key_code
]
child_idx
=
0
if
node
.
children
:
for
child
in
node
.
children
:
self
.
tree_info
[
node_id
].
append
(
self
.
keycode_nodeid_dict
[
child
.
key_code
])
child_idx
+=
1
while
child_idx
<
self
.
child_num
:
self
.
tree_info
[
node_id
].
append
(
0
)
child_idx
+=
1
if
node_id
%
1000
==
0
:
print
(
"get children node id {}"
.
format
(
node_id
))
def
travel_tree
(
self
):
self
.
travel_list
=
[]
tree_walker
=
Walker
()
print
(
"Begin Travel Tree"
)
for
item
in
sorted
(
self
.
id_node_dict
.
keys
()):
node
=
self
.
id_node_dict
[
int
(
item
)]
paths
,
_
,
_
=
tree_walker
.
walk
(
node
,
self
.
root
)
paths
=
list
(
paths
)
paths
.
reverse
()
travel
=
[
self
.
keycode_nodeid_dict
[
i
.
key_code
]
for
i
in
paths
]
while
len
(
travel
)
<
self
.
max_level
:
travel
.
append
(
0
)
self
.
travel_list
.
append
(
travel
)
def
tree_search_main
(
tree_path
,
id2item_path
,
output_dir
,
n_clusters
=
2
):
print
(
"Begin Tree Search"
)
t
=
Tree_search
(
tree_path
,
id2item_path
,
n_clusters
)
# 1. Walk all leaf nodes, get travel path array
travel_list
=
np
.
array
(
t
.
travel_list
)
np
.
save
(
os
.
path
.
join
(
output_dir
,
"travel_list.npy"
),
travel_list
)
with
open
(
os
.
path
.
join
(
output_dir
,
"travel_list.txt"
),
'w'
)
as
fout
:
for
i
,
travel
in
enumerate
(
t
.
travel_list
):
travel
=
map
(
str
,
travel
)
fout
.
write
(
','
.
join
(
travel
))
fout
.
write
(
"
\n
"
)
print
(
"End Save tree travel"
)
# 2. Walk all layer of tree, get layer array
layer_num
=
0
with
open
(
os
.
path
.
join
(
output_dir
,
"layer_list.txt"
),
'w'
)
as
fout
:
for
layer
in
t
.
level_code
:
# exclude layer 0
if
layer_num
==
0
:
layer_num
+=
1
continue
for
idx
in
range
(
len
(
layer
)
-
1
):
fout
.
write
(
str
(
layer
[
idx
])
+
','
)
fout
.
write
(
str
(
layer
[
-
1
])
+
"
\n
"
)
print
(
"Layer {} has {} node, the first {}, the last {}"
.
format
(
layer_num
,
len
(
layer
),
layer
[
0
],
layer
[
-
1
]))
layer_num
+=
1
print
(
"End Save tree layer"
)
# 3. Walk all node of tree, get tree info
tree_info
=
np
.
array
(
t
.
tree_info
)
np
.
save
(
os
.
path
.
join
(
output_dir
,
"tree_info.npy"
),
tree_info
)
with
open
(
os
.
path
.
join
(
output_dir
,
"tree_info.txt"
),
'w'
)
as
fout
:
for
i
,
node_infos
in
enumerate
(
t
.
tree_info
):
node_infos
=
map
(
str
,
node_infos
)
fout
.
write
(
','
.
join
(
node_infos
))
fout
.
write
(
"
\n
"
)
print
(
"End Save tree info"
)
# 4. save embedding
embedding
=
np
.
array
(
t
.
embedding
)
np
.
save
(
os
.
path
.
join
(
output_dir
,
"tree_emb.npy"
),
embedding
)
with
open
(
os
.
path
.
join
(
output_dir
,
"tree_embedding.txt"
),
"w"
)
as
fout
:
for
i
,
emb
in
enumerate
(
t
.
embedding
):
emb
=
map
(
str
,
emb
)
fout
.
write
(
','
.
join
(
emb
))
fout
.
write
(
"
\n
"
)
if
__name__
==
"__main__"
:
tree_path
=
"./tree.pkl"
id2item_path
=
"./id2item.json"
output_dir
=
"./output"
if
not
os
.
path
.
exists
(
output_dir
):
os
.
system
(
"mkdir -p "
+
output_dir
)
tree_search_main
(
tree_path
,
id2item_path
,
output_dir
)
run.py
浏览文件 @
6de9c8da
...
...
@@ -348,6 +348,7 @@ def cluster_engine(args):
cluster_envs
[
"fleet_mode"
]
=
fleet_mode
cluster_envs
[
"engine_role"
]
=
"WORKER"
cluster_envs
[
"log_dir"
]
=
"logs"
cluster_envs
[
"train.trainer.trainer"
]
=
trainer
cluster_envs
[
"train.trainer.engine"
]
=
"cluster"
cluster_envs
[
"train.trainer.executor_mode"
]
=
executor_mode
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录